Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete
Hi Jingsong, Although it would be nice if the accumulators in GlobalAggregateManager is fault-tolerant, we could still take advantage of managed state to guarantee the semantic and use the accumulators to implement distributed barrier or lock to solve the distributed access problem. Best, Shimin JingsongLee 于2019年9月9日周一 下午1:33写道: > Thanks jark and dian: > 1.jark's approach: do the work in task-0. Simple way. > 2.dian's approach: use StreamingRuntimeContext#getGlobalAggregateManager > Can do more operation. But these accumulators are not fault-tolerant? > > Best, > Jingsong Lee > > > -- > From:shimin yang > Send Time:2019年9月6日(星期五) 15:21 > To:dev > Subject:Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete > > Hi Fu, > > That'll be nice. > > Thanks. > > Best, > Shimin > > Dian Fu 于2019年9月6日周五 下午3:17写道: > > > Hi Shimin, > > > > It can be guaranteed to be an atomic operation. This is ensured by the > RPC > > framework. You could take a look at RpcEndpoint for more details. > > > > Regards, > > Dian > > > > > 在 2019年9月6日,下午2:35,shimin yang 写道: > > > > > > Hi Fu, > > > > > > Thank you for the remind. I think it would work in my case as long as > > it's > > > an atomic operation. > > > > > > Dian Fu 于2019年9月6日周五 下午2:22写道: > > > > > >> Hi Jingsong, > > >> > > >> Thanks for bring up this discussion. You can try to look at the > > >> GlobalAggregateManager to see if it can meet your requirements. It can > > be > > >> got via StreamingRuntimeContext#getGlobalAggregateManager(). > > >> > > >> Regards, > > >> Dian > > >> > > >>> 在 2019年9月6日,下午1:39,shimin yang 写道: > > >>> > > >>> Hi Jingsong, > > >>> > > >>> Big fan of this idea. We faced the same problem and resolved by > adding > > a > > >>> distributed lock. It would be nice to have this feature in JobMaster, > > >> which > > >>> can replace the lock. > > >>> > > >>> Best, > > >>> Shimin > > >>> > > >>> JingsongLee 于2019年9月6日周五 > 下午12:20写道: > > >>> > > Hi devs: > > > > I try to implement streaming file sink for table[1] like > > >> StreamingFileSink. > > If the underlying is a HiveFormat, or a format that updates > visibility > > through a metaStore, I have to update the metaStore in the > > notifyCheckpointComplete, but this operation occurs on the task > side, > > which will lead to distributed access to the metaStore, which will > > lead to bottleneck. > > > > So I'm curious if we can support notifyOnMaster for > > notifyCheckpointComplete like FinalizeOnMaster. > > > > What do you think? > > > > [1] > > > > >> > > > https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing > > > > Best, > > Jingsong Lee > > >> > > >> > > > > >
[DISCUSS] Retrieval services in non-high-availability scenario
Hi devs, I'd like to start a discussion thread on the topic how we provide retrieval services in non-high-availability scenario. To clarify terminology, non-high-availability scenario refers to StandaloneHaServices and EmbeddedHaServices. ***The problem*** We notice that retrieval services of current StandaloneHAServices (pre-configured) and EmbeddedHAServices(in-memory) has their respective problems. For pre-configured scenario, we now have a getJobManagerLeaderRetriever(JobID, defaultJMAddress) method to workaround the problem that it is impossible to configure JM address previously. The parameter defaultJMAddress is not in use in any other defaultJMAddress with any other high-availability mode. Also in MiniCluster scenario and anywhere else leader address pre-configure becomes impossible, StandaloneHAServices cannot be used. For in-memory case, it is clearly that it doesn't fit any distributed scenario. ***The proposal*** In order to address the inconsistency between pre-configured retrieval services and zookeeper based retrieval services, we reconsider the promises provided by "non-high-availability" and regard it as similar services as zookeeper based one except it doesn't tolerate node failure. Thus, we implement a service acts like a standalone zookeeper cluster, named LeaderServer. A leader server is an actor runs on jobmanager actor system and reacts to leader contender register and leader retriever request. If jobmanager fails, the leader server associated fails, too, where "non-high-availability" stands. In order to communicate with leader server, we start leader client per high-availability services(JM, TM, ClusterClient). When leader election service starts, it registers the contender to leader server via leader client(by akka communication); when leader retriever starts, it registers itself to leader server via leader client. Leader server handles leader election internally just like Embedded implementation, and notify retrievers with new leader information when there is new leader elected. In this way, we unify the view of retrieval services in all scenario: 1. Configure a name services to communicate with. In zookeeper mode it is zookeeper and in non-high-availability mode it is leader server. 2. Any retrieval request is sent to the name services and is handled by that services. Apart from a unified view, there are other advantages: + We need not to use a special method getJobManagerLeaderRetriever(JobID, defaultJMAddress), instead, use getJobManagerLeaderRetriever(JobID). And so that we need not include JobManager address in slot request which might become stale during transmission. + Separated configuration concerns on launch and retrieval. JobManager address & port, REST address & port is only configured when launch a cluster(even in YARN scenario, no need to configure). And when retrieval requested, configure the connect info to name services(zk or leader server). + Embedded implementation could be also included in this abstraction without any regression on multiple leader simulation for test purpose. Actually, leader server acts as a limited standalone zookeeper cluster. And thus, from where this proposal comes from, when we refactor metadata storage with transaction store proposed in FLINK-10333, we only take care of zookeeper implementation and a unified non-high-availability implementation. ***Clean up*** It is also noticed that there are several stale & unimplemented high-availability services implementations which I'd like to remove for a clean codebase work on this thread and FLINK-10333. They are: - YarnHighAvailabilityServices - AbstractYarnNonHaServices - YarnIntraNonHaMasterServices - YarnPreConfiguredMasterNonHaServices - SingleLeaderElectionService - FsNegativeRunningJobsRegistry Any feedback is appreciated. Best, tison.
[jira] [Created] (FLINK-14008) Flink Scala 2.12 binary distribution contains incorrect NOTICE-binary file
Till Rohrmann created FLINK-14008: - Summary: Flink Scala 2.12 binary distribution contains incorrect NOTICE-binary file Key: FLINK-14008 URL: https://issues.apache.org/jira/browse/FLINK-14008 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.9.0, 1.8.1, 1.10.0 Reporter: Till Rohrmann Fix For: 1.10.0 The Flink Scala {{2.12}} binary distribution contains an incorrect NOTICE-binary file. The problem is that we don't update the Scala version of the Scala dependencies listed in the {{NOTICE-binary}} file. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14009) Cron jobs broken due to verifying incorrect NOTICE-binary file
Till Rohrmann created FLINK-14009: - Summary: Cron jobs broken due to verifying incorrect NOTICE-binary file Key: FLINK-14009 URL: https://issues.apache.org/jira/browse/FLINK-14009 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.10.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.8.2, 1.9.1 With FLINK-13968 we introduced an automatic {{NOTICE-binary}} file check. However, since we don't use the correct {{NOTICE-binary}} file (FLINK-14008) for Scala 2.12 it fails currently our cron jobs. I suggest to only enable the automatic {{NOTICE-binary}} files for Scala 2.11 until FLINK-14008 has been fixed. -- This message was sent by Atlassian Jira (v8.3.2#803003)
Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC
Thanks a lot everyone for the warm welcome! Cheers, Kostas On Mon, Sep 9, 2019 at 4:54 AM Yun Gao wrote: > > Congratulations, Kostas! > > Best, > Yun > > > > > -- > From:Becket Qin > Send Time:2019 Sep. 9 (Mon.) 10:47 > To:dev > Subject:Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC > > Congrats, Kostas! > > On Sun, Sep 8, 2019 at 11:48 PM myasuka wrote: > > > Congratulations Kostas! > > > > Best > > Yun Tang > > > > > > > > -- > > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > >
Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC
Congrats, Kostas! Well deserved. Best, Jingsong Lee -- From:Kostas Kloudas Send Time:2019年9月9日(星期一) 15:50 To:dev ; Yun Gao Subject:Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC Thanks a lot everyone for the warm welcome! Cheers, Kostas On Mon, Sep 9, 2019 at 4:54 AM Yun Gao wrote: > > Congratulations, Kostas! > > Best, > Yun > > > > > -- > From:Becket Qin > Send Time:2019 Sep. 9 (Mon.) 10:47 > To:dev > Subject:Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC > > Congrats, Kostas! > > On Sun, Sep 8, 2019 at 11:48 PM myasuka wrote: > > > Congratulations Kostas! > > > > Best > > Yun Tang > > > > > > > > -- > > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > >
Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink
Thank you Bowen and Becket. What's the take from Flink community? Shall we wait for FLIP-27 or shall we proceed to next steps? And what the next steps are? :-) Thanks, Sijie On Thu, Sep 5, 2019 at 2:43 PM Bowen Li wrote: > Hi, > > I think having a Pulsar connector in Flink can be a good mutual benefit to > both communities. > > Another perspective is that Pulsar connector is the 1st streaming connector > that integrates with Flink's metadata management system and Catalog APIs. > It'll be cool to see how the integration turns out and whether we need to > improve Flink Catalog stack, which are currently in Beta, to cater to > streaming source/sink. Thus I'm in favor of merging Pulsar connector into > Flink 1.10. > > I'd suggest to submit smaller sized PRs, e.g. maybe one for basic > source/sink functionalities and another for schema and catalog integration, > just to make them easier to review. > > It doesn't seem to hurt to wait for FLIP-27. But I don't think FLIP-27 > should be a blocker in cases where it cannot make its way into 1.10 or > doesn't leave reasonable amount of time for committers to review or for > Pulsar connector to fully adapt to new interfaces. > > Bowen > > > > On Thu, Sep 5, 2019 at 3:21 AM Becket Qin wrote: > > > Hi Till, > > > > You are right. It all depends on when the new source interface is going > to > > be ready. Personally I think it would be there in about a month or so. > But > > I could be too optimistic. It would also be good to hear what do Aljoscha > > and Stephan think as they are also involved in FLIP-27. > > > > In general I think we should have Pulsar connector in Flink 1.10, > > preferably with the new source interface. We can also check it in right > now > > with old source interface, but I suspect few users will use it before the > > next official release. Therefore, it seems reasonable to wait a little > bit > > to see whether we can jump to the new source interface. As long as we > make > > sure Flink 1.10 has it, waiting a little bit doesn't seem to hurt much. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Thu, Sep 5, 2019 at 3:59 PM Till Rohrmann > wrote: > > > > > Hi everyone, > > > > > > I'm wondering what the problem would be if we committed the Pulsar > > > connector before the new source interface is ready. If I understood it > > > correctly, then we need to support the old source interface anyway for > > the > > > existing connectors. By checking it in early I could see the benefit > that > > > our users could start using the connector earlier. Moreover, it would > > > prevent that the Pulsar integration is being delayed in case that the > > > source interface should be delayed. The only downside I see is the > extra > > > review effort and potential fixes which might be irrelevant for the new > > > source interface implementation. I guess it mainly depends on how > certain > > > we are when the new source interface will be ready. > > > > > > Cheers, > > > Till > > > > > > On Thu, Sep 5, 2019 at 8:56 AM Becket Qin > wrote: > > > > > > > Hi Sijie and Yijie, > > > > > > > > Thanks for sharing your thoughts. > > > > > > > > Just want to have some update on FLIP-27. Although the FLIP wiki and > > > > discussion thread has been quiet for some time, a few committer / > > > > contributors in Flink community were actually prototyping the entire > > > thing. > > > > We have made some good progress there but want to update the FLIP > wiki > > > > after the entire thing is verified to work in case there are some > last > > > > minute surprise in the implementation. I don't have an exact ETA yet, > > > but I > > > > guess it is going to be within a month or so. > > > > > > > > I am happy to review the current Flink Pulsar connector and see if it > > > would > > > > fit in FLIP-27. It would be good to avoid the case that we checked in > > the > > > > Pulsar connector with some review efforts and shortly after that the > > new > > > > Source interface is ready. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Thu, Sep 5, 2019 at 8:39 AM Yijie Shen > > > > > wrote: > > > > > > > > > Thanks for all the feedback and suggestions! > > > > > > > > > > As Sijie said, the goal of the connector has always been to provide > > > > > users with the latest features of both systems as soon as possible. > > We > > > > > propose to contribute the connector to Flink and hope to get more > > > > > suggestions and feedback from Flink experts to ensure the high > > quality > > > > > of the connector. > > > > > > > > > > For FLIP-27, we noticed its existence at the beginning of reworking > > > > > the connector implementation based on Flink 1.9; we also wanted to > > > > > build a connector that supports both batch and stream computing > based > > > > > on it. > > > > > However, it has been inactive for some time, so we decided to > provide > > > > > a connector with most of the new features, such as the new type > > system > > > > > and th
Re: Checkpointing clarification
Yes you are correct Dominik. The committed Kafka offsets tell you what the program has read as input from the Kafka topic. But depending on the actual program logic this does not mean that you have output the results of processing these input events up to this point. As you have said, there are Flink operations such as window calculations which need to buffer events for a certain period before they can emit the results. Cheers, Till On Mon, Sep 9, 2019 at 4:54 AM Paul Lam wrote: > Hi Dom, > > There are sync phase and async phase in checkpointing. When a operator > receives a barrier, it performs snapshot aka the sync phase. And when the > barriers pass through all the operators including sinks, the operators will > get a notification, after which they do the async part, like committing the > Kafka offsets. WRT your question, the offsets would only be committed when > the whole checkpoint is successfully finished. For more information, you > can refer to this post[1]. > > [1] > https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html > < > https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html > > > > Best, > Paul Lam > > > 在 2019年9月9日,07:06,Dominik Wosiński 写道: > > > > Okay, thanks for clarifying. I have some followup question here. If we > > consider Kafka offsets commits, this basically means that > > the offsets committed during the checkpoint are not necessarily the > > offsets that were really processed by the pipeline and written to sink ? > I > > mean If there is a window in the pipeline, then the records are saved in > > the window state if the window was not emitted yet, but they are > considered > > as processed, thus will not be replayed in case of restart, because Flink > > considers them as processed when committing offsets. Am I correct ? > > > > Best, > > Dom. > >
[jira] [Created] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down
TisonKun created FLINK-14010: Summary: Dispatcher & JobManagers don't give up leadership when AM is shut down Key: FLINK-14010 URL: https://issues.apache.org/jira/browse/FLINK-14010 Project: Flink Issue Type: Bug Components: Deployment / YARN, Runtime / Coordination Affects Versions: 1.9.0, 1.8.1, 1.7.2, 1.10.0 Reporter: TisonKun In YARN deployment scenario, YARN RM possibly launches a new AM for the job even if the previous AM does not terminated, for example, when AMRM heartbeat timeout. This is a common case that RM will send a shutdown request to the previous AM and expect the AM shutdown properly. However, currently in {{YARNResourceManager}}, we handle this request in {{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new AM cannot be granted leadership properly. Visually, on previous AM: Dispatcher leader, JM leaders on new AM: ResourceManager leader since on client side or in per-job mode, JobManager address and port are configured as the new AM, the whole cluster goes into an unrecoverable inconsistent status: client all queries the dispatcher on new AM who is now the leader. Briefly, Dispatcher and JobManagers on previous AM do not give up their leadership properly. -- This message was sent by Atlassian Jira (v8.3.2#803003)
Call for approving Elasticsearch 7.x connector
Hi guys, There is an issue about supporting Elasticsearch 7.x.[1] Based on our validation and discussion. We found that Elasticsearch 7.x does not guarantee API compatibility. Therefore, it does not have the ability to provide a universal connector like Kafka. It seems that we have to provide a new connector to support Elasticsearch 7.x. Consider that Elasticsearch is a widely used system. There have been multiple user comments hoping to support Elasticsearch 7.x as soon as possible. Therefore, I hope this new connector will be approved as soon as possible, so that this work can be started. Best, Vino [1]: https://issues.apache.org/jira/browse/FLINK-13025
[jira] [Created] (FLINK-14011) Make some fields final and initialize them during construction in AsyncWaitOperator
Alex created FLINK-14011: Summary: Make some fields final and initialize them during construction in AsyncWaitOperator Key: FLINK-14011 URL: https://issues.apache.org/jira/browse/FLINK-14011 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: Alex This is a small follow up ticket after the FLINK-12958. With the changes introduced there, the {{AsyncWaitOperator}} is created by {{AsyncWaitOperatorFactory}}, so some fields that initialized in the {{setup}} method can be setup in the constructor instead. -- This message was sent by Atlassian Jira (v8.3.2#803003)
Re: [DISCUSS] StreamingFile with ParquetBulkWriter bucketing limitations
Hi Enrico, Sorry for the late reply. I think your understanding is correct. The best way to do it is to write your own ParquetBulkWriter and the corresponding factory. Out of curiosity, I guess that in the BucketingSink you were using the AvroKeyValueSinkWriter, right? Cheers, Kostas On Fri, Aug 30, 2019 at 10:23 AM Enrico Agnoli wrote: > > StreamingFile limitations > > Hi community, > > I'm working toward the porting of our code from `BucketingSink<>` to > `StreamingFileSink`. > In this case we use the sink to write AVRO via Parquet and the suggested > implementation of the Sink should be something like: > > ``` > val parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(mySchemaClass) > StreamingFileSink.forBulkFormat(basePath, > parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner) > ``` > > In this design the BucketAssigner is concatenated after the bulkFormat step. > The problem that I'm having with this design is that I have an object that > contains information that should be used to construct the path and a > sub-object that contains the data to serialize. A simple example > > myClass > |- country > |- cityClass extends SpecificRecordBase) > > Let's say I receive myClass as a stream and I want to serialize the cityClass > data via the logic above. The problem is that the `forBulkFormat(..)` needs > to run on a subType of `SpecificRecordBase`, so myClass doesn't work. > If I extract cityClass from myClass then I will not have country available in > the `withBucketAssigner(..)` to be able to store the data in the right > folder... > > > Am I missing something or I do have to write my own version of the > `ParquetBulkWriter` class so to be able to handle `myClass`? > > Thanks for any idea and suggestion. > Enrico
Re: [DISCUSS] Retrieval services in non-high-availability scenario
Hi Tison, thanks for starting this discussion. I think your mail includes multiple points which are worth being treated separately (might even make sense to have separate discussion threads). Please correct me if I understood things wrongly: 1. Adding new non-ha HAServices: Based on your description I could see the "ZooKeeper-light" non-ha HAServices implementation work. Would any changes to the existing interfaces be needed? How would the LeaderServer integrate in the lifecycle of the cluster entrypoint? 2. Replacing existing non-ha HAServices with LeaderServer implementation: I'm not sure whether we need to enforce that every non-ha HAServices implementation works as you've described. I think it is pretty much an implementation detail whether the services talk to a LeaderServer or are being started with a pre-configured address. I also think that it is fair to have different implementations with different characteristics and usage scenarios. As you've said the EmbeddedHaServices are targeted for single process cluster setups and they are only used by the MiniCluster. What I like about the StandaloneHaServices is that they are dead simple (apart from the configuration). With a new implementation based on the LeaderServer, the client side implementation becomes much more complex because now one needs to handle all kind of network issues properly. Moreover, it adds more complexity to the system because it starts a new distributed component which needs to be managed. I could see that once the new implementation has matured enough that it might replace the EmbeddedHaServices. But I wouldn't start with removing them. You are right that due to the fact that we don't know the JM address before it's being started that we need to send the address with every slot request. Moreover we have the method #getJobManagerLeaderRetriever(JobID, defaultJMAddress) on the HAServices. While this is not super nice, I don't think that this is a fundamental problem at the moment. What we pay is a couple of extra bytes we need to send over the network. Configuration-wise, I'm not so sure whether we gain too much by replacing the StandaloneHaServices with the LeaderServer based implementation. For the new implementation one needs to configure a static address as well at cluster start-up time. The only benefit I can see is that we don't need to send the JM address to the RM and TMs. But as I've said, I don't think that this is a big problem for which we need to introduce new HAServices. Instead I could see that we might be able to remove it once the LeaderServer HAServices implementation has proven to be stable. 3. Configuration of HAServices: I agree that Flink's address and port configuration is not done consistently. I might make sense to group the address and port configuration under the ha service configuration section. Maybe it makes also sense to rename ha services into ServiceDiscovery because it also works in the non-ha case. it could be possible to only configure address and port if one is using the non-ha services, for example. However, this definitely deserves a separate discussion and design because one needs to check where exactly the respective configuration options are being used. I think improving the configuration of HAServices is actually orthogonal to introducing the LeaderServer HAServices implementation and could also be done for the existing HAServices. 4. Clean up of HAServices implementations: You are right that some of the existing HAServices implementations are "dead code" at the moment. They are the result of some implementation ideas which haven't been completed. I would suggest to start a separate discussion to discuss what to do with them. Cheers, Till On Mon, Sep 9, 2019 at 9:16 AM Zili Chen wrote: > Hi devs, > > I'd like to start a discussion thread on the topic how we provide > retrieval services in non-high-availability scenario. To clarify > terminology, non-high-availability scenario refers to > StandaloneHaServices and EmbeddedHaServices. > > ***The problem*** > > We notice that retrieval services of current StandaloneHAServices > (pre-configured) and EmbeddedHAServices(in-memory) has their > respective problems. > > For pre-configured scenario, we now have a > getJobManagerLeaderRetriever(JobID, defaultJMAddress) method > to workaround the problem that it is impossible to configure JM > address previously. The parameter defaultJMAddress is not in use in > any other defaultJMAddress with any other high-availability mode. > Also in MiniCluster scenario and anywhere else leader address > pre-configure becomes impossible, StandaloneHAServices cannot be used. > > For in-memory case, it is clearly that it doesn't fit any distributed > scenario. > > ***The proposal*** > > In order to address the inconsistency between pre-configured retrieval > services and zookeeper based retrieval services, we reconsider the > promises provided by "non-high-availability" and regard it as > similar ser
[jira] [Created] (FLINK-14012) Failed to start job for consuming Secure Kafka after the job cancel
Daebeom Lee created FLINK-14012: --- Summary: Failed to start job for consuming Secure Kafka after the job cancel Key: FLINK-14012 URL: https://issues.apache.org/jira/browse/FLINK-14012 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.9.0 Environment: * Kubernetes 1.13.2 * Flink 1.9.0 * Kafka client libary 2.2.0 Reporter: Daebeom Lee Hello, this is Daebeom Lee. h2. Background I installed Flink 1.9.0 at this our Kubernetes cluster. We use Flink session cluster. - build fatJar file and upload it at the UI, run serval jobs. At first, our jobs are good to start. But, when we cancel some jobs, the job failed This is the error code. {code:java} // code placeholder java.lang.NoClassDefFoundError: org/apache/kafka/common/security/scram/internals/ScramSaslClient at org.apache.kafka.common.security.scram.internals.ScramSaslClient$ScramSaslClientFactory.createSaslClient(ScramSaslClient.java:235) at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:168) at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254) at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202) at org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:140) at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210) at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334) at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325) at org.apache.kafka.common.network.Selector.connect(Selector.java:257) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771) at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) {code} h2. Our workaround * I think that this is Flink JVM classloader issue. * Classloader unloads when job cancels by the way kafka client library is included fatJar. * So, I located Kafka client library to /opt/flink/lib ** /opt/flink/lib/kafka-clients-2.2.0.jar * And then all issue solved. * But there are weird points ** When Flink 1.8.1 has no problem before 2 weeks ** Before 1 week I rollback from 1.9.0 to 1.8.1, same errors occurred. ** Maybe docker image is changed at docker repository ( [https://github.com/docker-flink/docker-flink ) |https://github.com/docker-flink/docker-flink] h2. Suggestion * I'd like to know why this error occurred exactly reason after upgrade 1.9.0. * Does anybody know a better solution in this case? Thank you in advance. -- Thi
Re: [DISCUSS] StreamingFile with ParquetBulkWriter bucketing limitations
Thanks for confirming. We have a ``` public class ParquetSinkWriter implements Writer ``` that handles the serialization of the data. We implemented it starting from: https://medium.com/hadoop-noob/flink-parquet-writer-d127f745b519 https://stackoverflow.com/questions/48098011/how-to-use-apache-flink-write-parquet-file-on-hdfs-by-datetime-partition On 2019/09/09 09:31:03, Kostas Kloudas wrote: > Hi Enrico, > > Sorry for the late reply. I think your understanding is correct. > The best way to do it is to write your own ParquetBulkWriter and the > corresponding factory. > > Out of curiosity, I guess that in the BucketingSink you were using the > AvroKeyValueSinkWriter, right? > > Cheers, > Kostas > > On Fri, Aug 30, 2019 at 10:23 AM Enrico Agnoli > wrote: > > > > StreamingFile limitations > > > > Hi community, > > > > I'm working toward the porting of our code from `BucketingSink<>` to > > `StreamingFileSink`. > > In this case we use the sink to write AVRO via Parquet and the suggested > > implementation of the Sink should be something like: > > > > ``` > > val parquetWriterFactory = > > ParquetAvroWriters.forSpecificRecord(mySchemaClass) > > StreamingFileSink.forBulkFormat(basePath, > > parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner) > > ``` > > > > In this design the BucketAssigner is concatenated after the bulkFormat > > step. The problem that I'm having with this design is that I have an object > > that contains information that should be used to construct the path and a > > sub-object that contains the data to serialize. A simple example > > > > myClass > > |- country > > |- cityClass extends SpecificRecordBase) > > > > Let's say I receive myClass as a stream and I want to serialize the > > cityClass data via the logic above. The problem is that the > > `forBulkFormat(..)` needs to run on a subType of `SpecificRecordBase`, so > > myClass doesn't work. > > If I extract cityClass from myClass then I will not have country available > > in the `withBucketAssigner(..)` to be able to store the data in the right > > folder... > > > > > > Am I missing something or I do have to write my own version of the > > `ParquetBulkWriter` class so to be able to handle `myClass`? > > > > Thanks for any idea and suggestion. > > Enrico >
Re: [DISCUSS] FLIP-66: Support time attribute in SQL DDL
Hi all, Thanks all for so much feedbacks received in the doc so far. I saw a general agreement on using computed column to support proctime attribute and extract timestamps. So we will prepare a computed column FLIP and share in the dev ML soon. Feel free to leave more comments! Best, Jark On Fri, 6 Sep 2019 at 13:50, Dian Fu wrote: > Hi Jark, > > Thanks for bringing up this discussion and the detailed design doc. This > is definitely a critical feature for streaming SQL jobs. I have left a few > comments in the design doc. > > Thanks, > Dian > > > 在 2019年9月6日,上午11:48,Forward Xu 写道: > > > > Thanks Jark for this topic, This will be very useful. > > > > > > Best, > > > > ForwardXu > > > > Danny Chan 于2019年9月6日周五 上午11:26写道: > > > >> Thanks Jark for bring up this topic, this is definitely an import > feature > >> for the SQL, especially the DDL users. > >> > >> I would spend some time to review this design doc, really thanks. > >> > >> Best, > >> Danny Chan > >> 在 2019年9月6日 +0800 AM11:19,Jark Wu ,写道: > >>> Hi everyone, > >>> > >>> I would like to start discussion about how to support time attribute in > >> SQL > >>> DDL. > >>> In Flink 1.9, we already introduced a basic SQL DDL to create a table. > >>> However, it doesn't support to define time attributes. This makes users > >>> can't > >>> apply window operations on the tables created by DDL which is a bad > >>> experience. > >>> > >>> In FLIP-66, we propose a syntax for watermark to define rowtime > attribute > >>> and propose to use computed column syntax to define proctime attribute. > >>> But computed column is another big topic and should deserve a separate > >>> FLIP. > >>> If we have a consensus on the computed column approach, we will start > >>> computed column FLIP soon. > >>> > >>> FLIP-66: > >>> > >> > https://docs.google.com/document/d/1-SecocBqzUh7zY6HBYcfMlG_0z-JAcuZkCvsmN3LrOw/edit# > >>> > >>> Thanks for any feedback! > >>> > >>> Best, > >>> Jark > >> > >
Re: [VOTE] Release 1.8.2, release candidate #1
+1 (non-binding) - built from source successfully (mvn clean install -DskipTests) - checked gpg signature and hashes of the source release and binary release packages - All artifacts have been deployed to the maven central repository - no new dependencies were added since 1.8.1 - run a couple of tests in IDE success Regards, Dian > 在 2019年9月9日,下午2:28,jincheng sun 写道: > > +1 (binding) > > - checked signatures [SUCCESS] > - built from source without tests [SUCCESS] > - ran some tests in IDE [SUCCESS] > - start local cluster and submit word count example [SUCCESS] > - announcement PR for website looks good! (I have left a few comments) > > Best, > Jincheng > > Jark Wu 于2019年9月6日周五 下午8:47写道: > >> Hi everyone, >> >> Please review and vote on the release candidate #1 for the version 1.8.2, >> as follows: >> [ ] +1, Approve the release >> [ ] -1, Do not approve the release (please provide specific comments) >> >> >> The complete staging area is available for your review, which includes: >> * JIRA release notes [1], >> * the official Apache source release and binary convenience releases to be >> deployed to dist.apache.org [2], which are signed with the key with >> fingerprint E2C45417BED5C104154F341085BACB5AEFAE3202 [3], >> * all artifacts to be deployed to the Maven Central Repository [4], >> * source code tag "release-1.8.2-rc1" [5], >> * website pull request listing the new release and adding announcement blog >> post [6]. >> >> The vote will be open for at least 72 hours. >> Please cast your votes before *Sep. 11th 2019, 13:00 UTC*. >> >> It is adopted by majority approval, with at least 3 PMC affirmative votes. >> >> Thanks, >> Jark >> >> [1] >> >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345670 >> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.8.2-rc1/ >> [3] https://dist.apache.org/repos/dist/release/flink/KEYS >> [4] https://repository.apache.org/content/repositories/orgapacheflink-1262 >> [5] >> >> https://github.com/apache/flink/commit/6322618bb0f1b7942d86cb1b2b7bc55290d9e330 >> [6] https://github.com/apache/flink-web/pull/262 >>
Re: [VOTE] FLIP-58: Flink Python User-Defined Function for Table API
Thanks Jincheng a lot for the remind and thanks all for the voting. I'm closing the vote now. So far, the vote has received: - 5 binding +1 votes (Jincheng, Hequn, Jark, Shaoxuan, Becket) - 5 non-binding +1 votes (Wei, Xingbo, Terry, Yu, Jeff) - No 0/-1 votes There are more than 3 binding +1 votes, no -1 votes, and the voting time has passed. According to the new bylaws, I'm glad to announce that FLIP-58 is approved. I'll update the FLIP wiki page accordingly. Thanks, Dian > 在 2019年9月9日,上午10:38,jincheng sun 写道: > > Hi all, > > This VOTE looks like everyone agrees with the current FLIP. > > Hi Time & Aljoscha Do you have any other comments after the ML discussion? > [1] > > Hi Dian, Could you announce the VOTE result and create a JIRA. for the FLIP > today later, if there no other feedback? > > Cheers, > Jincheng > > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673i20.html#a32669 > > Becket Qin 于2019年9月2日周一 下午1:32写道: > >> +1 >> >> It is extremely useful for ML users. >> >> On Mon, Sep 2, 2019 at 9:46 AM Shaoxuan Wang wrote: >> >>> +1 (binding) >>> >>> This will be a great feature for Flink users, especially for the data >>> science and AI engineers. >>> >>> Regards, >>> Shaoxuan >>> >>> >>> On Fri, Aug 30, 2019 at 1:35 PM Jeff Zhang wrote: >>> +1, very looking forward this feature in flink 1.10 Yu Li 于2019年8月30日周五 上午11:08写道: > +1 (non-binding) > > Thanks for driving this! > > Best Regards, > Yu > > > On Fri, 30 Aug 2019 at 11:01, Terry Wang wrote: > >> +1. That would be very helpful. >> Best, >> Terry Wang >> >> >> >>> 在 2019年8月30日,上午10:18,Jark Wu 写道: >>> >>> +1 >>> >>> Thanks for the great work! >>> >>> On Fri, 30 Aug 2019 at 10:04, Xingbo Huang wrote: >>> Hi Dian, +1, Thanks a lot for driving this. Best, Xingbo > 在 2019年8月30日,上午9:39,Wei Zhong 写道: > > Hi Dian, > > +1 non-binding > Thanks for driving this! > > Best, Wei > >> 在 2019年8月29日,09:25,Hequn Cheng 写道: >> >> Hi Dian, >> >> +1 >> Thanks a lot for driving this. >> >> Best, Hequn >> >> On Wed, Aug 28, 2019 at 2:01 PM jincheng sun < >> sunjincheng...@gmail.com> >> wrote: >> >>> Hi Dian, >>> >>> +1, Thanks for your great job! >>> >>> Best, >>> Jincheng >>> >>> Dian Fu 于2019年8月28日周三 上午11:04写道: >>> Hi all, I'd like to start a voting thread for FLIP-58 [1] since that >>> we > have reached an agreement on the design in the discussion thread >>> [2], This vote will be open for at least 72 hours. Unless there >> is >>> an objection, I will try to close it by Sept 2, 2019 00:00 UTC >> if we >> have received sufficient votes. PS: This doesn't mean that we cannot further improve the >>> design. > We can still discuss the implementation details case by case in the JIRA > as long as it doesn't affect the overall design. [1] >>> >> > >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Function+for+Table+API < >>> >> > >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58:+Flink+Python+User-Defined+Function+for+Table+API > [2] >>> >> > >>> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html < >>> >> > >>> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html > Thanks, Dian >>> > >> >> > -- Best Regards Jeff Zhang >>> >>
[jira] [Created] (FLINK-14013) Support Flink Python User-Defined Stateless Function for Table
Dian Fu created FLINK-14013: --- Summary: Support Flink Python User-Defined Stateless Function for Table Key: FLINK-14013 URL: https://issues.apache.org/jira/browse/FLINK-14013 Project: Flink Issue Type: New Feature Components: API / Python, Table SQL / API Affects Versions: 1.10.0 Reporter: Dian Fu Fix For: 1.10.0 The Python Table API has been supported in release 1.9.0. See the [FLIP-38|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API]] and FLINK-12308 for details. However, currently Python user-defined functions are still not supported. In this FLIP, we want to support stateless Python user-defined functions in Python Table API. More detailed description can be found in [FLIP-58|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table]]. The discussion can be found in [mailing thread|[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html]]. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14014) Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution
Dian Fu created FLINK-14014: --- Summary: Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution Key: FLINK-14014 URL: https://issues.apache.org/jira/browse/FLINK-14014 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 PythonScalarFunctionRunner is responsible for Python ScalarFunction execution and it only handles the Python ScalarFunction execution and nothing else. So its logic should be very simple, forwarding an input element to Python worker and fetching the execution results back: # Internally, it uses Apache Beam’s portability for Python UDF execution and this is transparent for the caller of PythonScalarFunctionRunner # By default, each runner will startup a separate Python worker # The Python worker can run in a docker, a separate process or even an non-managed external service. # It has the ability to execute multiple Python ScalarFunctions # It also supports chained Python ScalarFunctions -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14015) Introduce PythonScalarFunctionOperator as a standalone StreamOperator for Python ScalarFunction execution
Dian Fu created FLINK-14015: --- Summary: Introduce PythonScalarFunctionOperator as a standalone StreamOperator for Python ScalarFunction execution Key: FLINK-14015 URL: https://issues.apache.org/jira/browse/FLINK-14015 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 PythonScalarFunctionOperator is a standalone StreamOperator and it doesn’t need to how the Python ScalarFunctions are executed which is the responsibility of PythonScalarFunctionRunner: # It is a StreamOperator which employs PythonScalarFunctionRunner for Python ScalarFunction execution # It sends input elements to PythonScalarFunctionRunner, fetches the execution results, constructs the result rows and sends them to the downstream operator # It should handle the checkpoint and watermark properly -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14016) Introduce RelNodes FlinkLogicalPythonScalarFunctionExec and DataStreamPythonScalarFunctionExec which are containers for Python PythonScalarFunctions
Dian Fu created FLINK-14016: --- Summary: Introduce RelNodes FlinkLogicalPythonScalarFunctionExec and DataStreamPythonScalarFunctionExec which are containers for Python PythonScalarFunctions Key: FLINK-14016 URL: https://issues.apache.org/jira/browse/FLINK-14016 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 Dedicated RelNodes such as FlinkLogicalPythonScalarFunctionExec and DataStreamPythonScalarFunctionExec should be introduced for Python ScalarFunction execution. These nodes exists as containers for Python ScalarFunctions which could be executed in a batch and then we can employ PythonScalarFunctionOperator for Python ScalarFunction execution. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14017) Support to start up Python worker in process mode
Dian Fu created FLINK-14017: --- Summary: Support to start up Python worker in process mode Key: FLINK-14017 URL: https://issues.apache.org/jira/browse/FLINK-14017 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 We employ Apache Beam's portability frameowork for the Python UDF execution. However, there is only a golang implementation for the boot script to start up SDK harness in Beam. It’s used by both the Python SDK harness and the Go SDK harness. This is not a problem for Beam. However, it’s indeed a problem for Flink as it indicates that the whole stack of Beam’s Go SDK harness will be depended if we use the golang implementation of the boot script. We want to avoid this by adding a Python boot script. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14018) Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work
Dian Fu created FLINK-14018: --- Summary: Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work Key: FLINK-14018 URL: https://issues.apache.org/jira/browse/FLINK-14018 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 We need to add a few Python building blocks such as ScalarFunctionOperation, BigIntCoder, VarcharCoder, etc for Python ScalarFunction execution. ScalarFunctionOperation is subclass of Operation in Beam and BigIntCoder, VarcharCoder, etc are subclasses of Coder in Beam. These classes will be registered into the Beam’s portability framework to make sure they take effects. This PR makes sure that a basic end to end Python UDF could be executed. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14019) Python environment and dependency management
Dian Fu created FLINK-14019: --- Summary: Python environment and dependency management Key: FLINK-14019 URL: https://issues.apache.org/jira/browse/FLINK-14019 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 A Python user-defined functions may depend on third party dependencies. We should provide a proper way to handle it: # Provide a way to let users specifying the dependencies # Provide a way to let users specifying the Python used -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14020) User Apache Arrow as the serializer for data transmission between Java operator and Python harness
Dian Fu created FLINK-14020: --- Summary: User Apache Arrow as the serializer for data transmission between Java operator and Python harness Key: FLINK-14020 URL: https://issues.apache.org/jira/browse/FLINK-14020 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 Apache Arrow is "a cross-language development platform for in-memory data. It specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware". It has been widely used in many notable projects, such as Spark, Parquet, Pandas, etc. We could make use of Arrow as the data serializer between Java operator and Python harness. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14021) Add rules to push down the Python ScalarFunctions contained in the join condition of Correlate node
Dian Fu created FLINK-14021: --- Summary: Add rules to push down the Python ScalarFunctions contained in the join condition of Correlate node Key: FLINK-14021 URL: https://issues.apache.org/jira/browse/FLINK-14021 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 The Python ScalarFunctions contained in the join condition of Correlate node should be extracted to make sure the TableFunction works well. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14022) Add validation check for places where Python ScalarFunction cannot be used
Dian Fu created FLINK-14022: --- Summary: Add validation check for places where Python ScalarFunction cannot be used Key: FLINK-14022 URL: https://issues.apache.org/jira/browse/FLINK-14022 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 Currently, there are places where Python ScalarFunction could not be used, for example: # Python UDF could not be used in MatchRecognize # Python UDFs could not be used in Join condition which take the columns from both the left table and the right table as inputs We should add validation check for places where it’s not supported. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14023) Support accessing job parameters in Python user-defined functions
Dian Fu created FLINK-14023: --- Summary: Support accessing job parameters in Python user-defined functions Key: FLINK-14023 URL: https://issues.apache.org/jira/browse/FLINK-14023 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 Currently, it’s possible to access job parameters in the Java user-defined functions. It could be used to define the behavior according to job parameters. It should also be supported for Python user-defined functions. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14024) Support use-defined metrics in Python user-defined functions
Dian Fu created FLINK-14024: --- Summary: Support use-defined metrics in Python user-defined functions Key: FLINK-14024 URL: https://issues.apache.org/jira/browse/FLINK-14024 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 We should support users to define a few metrics in the Python user-defined functions. Beam’s portability framework has provided a framework for metrics report. We could make use of it. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14025) Support to run the Python worker in docker mode
Dian Fu created FLINK-14025: --- Summary: Support to run the Python worker in docker mode Key: FLINK-14025 URL: https://issues.apache.org/jira/browse/FLINK-14025 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 The Python worker run in “Process” mode by default. Docker mode should be supported as well. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14026) Manage the resource of Python worker properly
Dian Fu created FLINK-14026: --- Summary: Manage the resource of Python worker properly Key: FLINK-14026 URL: https://issues.apache.org/jira/browse/FLINK-14026 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 For a Flink Table API & SQL job, if it uses Python user-defined functions, the Java operator will launch separate Python process for Python user-defined function execution. We should make sure that the resources used by the Python process are managed by Flink’s resource management framework. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14027) Add documentation for Python user-defined functions
Dian Fu created FLINK-14027: --- Summary: Add documentation for Python user-defined functions Key: FLINK-14027 URL: https://issues.apache.org/jira/browse/FLINK-14027 Project: Flink Issue Type: Sub-task Components: API / Python, Documentation Reporter: Dian Fu Fix For: 1.10.0 We should add documentation about how to use Python user-defined functions. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14028) Support logging aggregation in Python user-defined functions
Dian Fu created FLINK-14028: --- Summary: Support logging aggregation in Python user-defined functions Key: FLINK-14028 URL: https://issues.apache.org/jira/browse/FLINK-14028 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.10.0 Beam's portability framework has provided the ability to collecting log to operator from the Python workers. We should make use of this functionality to collect the logging of Python user-defined functions and output them into the logging file of Java operator process. Then users could access the logging generated by the Python user-defined functions easily. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14029) Update Flink's Mesos scheduling behavior to reject all expired offers
Piyush Narang created FLINK-14029: - Summary: Update Flink's Mesos scheduling behavior to reject all expired offers Key: FLINK-14029 URL: https://issues.apache.org/jira/browse/FLINK-14029 Project: Flink Issue Type: Bug Reporter: Piyush Narang While digging into why our Flink jobs weren't being scheduled on our internal Mesos setup we noticed that we were hitting Mesos quota limits tied to the way we've set up the Fenzo (https://github.com/Netflix/Fenzo/) library defaults in the Flink project. Behavior we noticed was that we got a bunch of offers from our Mesos master (50+) out of which only 1 or 2 of them were super skewed and took up a huge chunk of our disk resource quota. Thanks to this we were not sent any new / different offers (as our usage at the time + resource offers reached our Mesos disk quota). As the Flink / Fenzo Mesos scheduling code was not using the 1-2 skewed disk offers they end up expiring. The way we've set up the Fenzo scheduler is to use the default values on when to expire unused offers (120s) and maximum number of unused offer leases at a time (4). Unfortunately as we have a considerable number of outstanding expired offers (50+) we end up in a situation where we reject only 4 or so every 2 mins and we never get around to rejecting the super skewed disk ones which are stopping us from scheduling our Flink job. Thanks to this we end up in a situation where our job is waiting to be scheduled for more than an hour. An option to work around this is to reject all expired offers at 2 minute expiry time rather than hold on to them. This will allow Mesos to send alternate offers that might be scheduled by Fenzo. -- This message was sent by Atlassian Jira (v8.3.2#803003)
Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC
Congrats, Kostas! Thanks, Biao /'bɪ.aʊ/ On Mon, 9 Sep 2019 at 16:07, JingsongLee wrote: > Congrats, Kostas! Well deserved. > > Best, > Jingsong Lee > > > -- > From:Kostas Kloudas > Send Time:2019年9月9日(星期一) 15:50 > To:dev ; Yun Gao > Subject:Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC > > Thanks a lot everyone for the warm welcome! > > Cheers, > Kostas > > On Mon, Sep 9, 2019 at 4:54 AM Yun Gao > wrote: > > > > Congratulations, Kostas! > > > > Best, > > Yun > > > > > > > > > > -- > > From:Becket Qin > > Send Time:2019 Sep. 9 (Mon.) 10:47 > > To:dev > > Subject:Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC > > > > Congrats, Kostas! > > > > On Sun, Sep 8, 2019 at 11:48 PM myasuka wrote: > > > > > Congratulations Kostas! > > > > > > Best > > > Yun Tang > > > > > > > > > > > > -- > > > Sent from: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > > > > >
[jira] [Created] (FLINK-14030) Nonequivalent conversion happens in Table planner
Leonard Xu created FLINK-14030: -- Summary: Nonequivalent conversion happens in Table planner Key: FLINK-14030 URL: https://issues.apache.org/jira/browse/FLINK-14030 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.0 Reporter: Leonard Xu Fix For: 2.0.0 ~testAllApis()~ unit tests will run fail because planner make a conversion from ~[ifThenElse(isNull(plus(f0, f1)), 'null', 'not null')]~ to ~[CASE(OR(IS NULL($0), IS NULL($1)), _UTF-16LE'null', _UTF-16LE'not null')]~ which is not a equivalence conversion. The result of expression 'f0 + 'f1 should be null when the result overflows even if its two operands both are not null. It's easy to reproduce as following: ^testAllApis( 'f0 + 'f1, "f1 + f1", "f1 + f1", "null")// the result should be null because overflow override def testData: Row = { val testData = new Row(2) testData.setField(0, BigDecimal("1e10").bigDecimal) testData.setField(1, BigDecimal("0").bigDecimal) testData } override def typeInfo: RowTypeInfo = { new RowTypeInfo( /* 0 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 10)), /* 1 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 28)) ) }^ -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14031) flink-examples should add blink dependency on flink-examples-table
Jimmy Wong created FLINK-14031: -- Summary: flink-examples should add blink dependency on flink-examples-table Key: FLINK-14031 URL: https://issues.apache.org/jira/browse/FLINK-14031 Project: Flink Issue Type: Bug Components: Examples Affects Versions: 1.9.0 Reporter: Jimmy Wong Fix For: 1.9.0 The flink-examples-table module is missing Blink dependency. If I run a blink example with Intellij IDEA, have error: {code:java} Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpathException in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:140) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:110) at org.apache.flink.table.api.java.StreamTableEnvironment.create(StreamTableEnvironment.java:112) at org.apache.flink.table.examples.java.BlinkStreamSQL.main(BlinkStreamSQL.java:19)Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' inthe classpath. {code} *But*, once I add Blink dependency, it'll work well. -- This message was sent by Atlassian Jira (v8.3.2#803003)
Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors
One thing that I just came across: Some of these options should also have a corresponding value for the JobManager, like JVM overhead, metaspace, direct memory. On Fri, Sep 6, 2019 at 4:34 AM Xintong Song wrote: > Thanks all for the votes. > So far, we have > >- 4 binding +1 votes (Stephan, Andrey, Till, Zhijiang) >- 2 un-binding +1 votes (Xintong, Yu) >- No -1 votes > > There are more than 3 binding +1 votes and no -1 votes, and the voting time > has past. According to the new bylaws, I'm happily to announce that FLIP-49 > is approved to be adopted by Apache Flink. > > Regarding the minors mentioned during the voting, if there's no objection, > I would like to update the FLIP document with the followings > >- Exclude JVM Overhead from '-XX:MaxDirectMemorySize' >- Add a 'Follow-Up' section, with the follow-ups of web ui and >documentation issues >- Add a 'Limitation' section, with the Unsafe Java 12 support issue > > > Thank you~ > > Xintong Song > > > > On Fri, Sep 6, 2019 at 10:28 AM Xintong Song > wrote: > > > +1 (non-binding) from my side. > > > > @Yu, thanks for the vote. > > - The current FLIP document already mentioned the issue that Unsafe is > not > > supported in Java 12, in the section 'Unifying Explicit and Implicit > Memory > > Allocation'. It makes sense to me to emphasize this by adding a separate > > limitation section. > > - I think we should also update the FLIP document if we change the config > > names later in PRs. But I would not consider this as a major change to > the > > FLIP that requires another vote, especially when we already agreed during > > this vote to revisit the config names at implementation stage. > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Fri, Sep 6, 2019 at 2:43 AM Yu Li wrote: > > > >> +1 (non-binding) > >> > >> Minor: > >> 1. Is it worth a separate "Limitations" section to contain all relative > >> information like the Unsafe support issue in Java 12, just like many > other > >> FLIPs? > >> 2. About the config names, if we change them later in PR, does it mean > we > >> will need to update the FLIP document? If so, it seems we need another > >> vote > >> after the modification according to current bylaw? Or maybe we could > just > >> create a subpage under the FLIP and only re-vote on that part later? > >> > >> Thanks. > >> > >> Best Regards, > >> Yu > >> > >> > >> On Thu, 5 Sep 2019 at 00:52, Stephan Ewen wrote: > >> > >> > Let's not block on config key names, just go ahead and we figure this > >> out > >> > concurrently or on the PR later. > >> > > >> > > >> > On Wed, Sep 4, 2019 at 3:48 PM Stephan Ewen wrote: > >> > > >> > > Maybe to clear up confusion about my suggestion: > >> > > > >> > > I would vote to keep the name of the config parameter > >> > > "taskmanager.memory.network" because it is the same key as currently > >> > (good > >> > > to not break things unless good reason) and there currently is no > >> case or > >> > > even a concrete follow-up where we would actually differentiate > >> between > >> > > different types of network memory. > >> > > > >> > > I would suggest to not prematurely rename this because of something > >> that > >> > > might happen in the future. Experience shows that its better to do > >> these > >> > > things when the actual change comes. > >> > > > >> > > Side note: I am not sure "shuffle" is a good term in this context. I > >> have > >> > > so far only heard that in batch contexts, which is not what we do > >> here. > >> > One > >> > > more reason for me to not pre-maturely change names. > >> > > > >> > > On Wed, Sep 4, 2019 at 10:56 AM Xintong Song > > >> > > wrote: > >> > > > >> > >> @till > >> > >> > >> > >> > Just to clarify Xintong, you suggest that Task off-heap memory > >> > >> represents > >> > >> > direct and native memory. Since we don't know how the user will > >> > allocate > >> > >> > the memory we will add this value to -XX:MaxDirectMemorySize so > >> that > >> > the > >> > >> > process won't fail if the user allocates only direct memory and > no > >> > >> native > >> > >> > memory. Is that correct? > >> > >> > > >> > >> Yes, this is what I mean. > >> > >> > >> > >> > >> > >> Thank you~ > >> > >> > >> > >> Xintong Song > >> > >> > >> > >> > >> > >> > >> > >> On Wed, Sep 4, 2019 at 4:25 PM Till Rohrmann > > >> > >> wrote: > >> > >> > >> > >> > Just to clarify Xintong, you suggest that Task off-heap memory > >> > >> represents > >> > >> > direct and native memory. Since we don't know how the user will > >> > allocate > >> > >> > the memory we will add this value to -XX:MaxDirectMemorySize so > >> that > >> > the > >> > >> > process won't fail if the user allocates only direct memory and > no > >> > >> native > >> > >> > memory. Is that correct? > >> > >> > > >> > >> > Cheers, > >> > >> > Till > >> > >> > > >> > >> > On Wed, Sep 4, 2019 at 10:18 AM Xintong Song < > >> tonysong...@gmail.com> > >> > >> > wrote: > >> > >> > > >> > >> > > @Stephan > >> > >> > > Not sure
[jira] [Created] (FLINK-14032) Make the cache size of RocksDBPriorityQueueSetFactory configurable
Yun Tang created FLINK-14032: Summary: Make the cache size of RocksDBPriorityQueueSetFactory configurable Key: FLINK-14032 URL: https://issues.apache.org/jira/browse/FLINK-14032 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Yun Tang Fix For: 1.10.0 Currently, the cache size of {{RocksDBPriorityQueueSetFactory}} has been set as 128 and no any ways to configure this to other value. (We could increase this to obtain better performance if necessary). Actually, this is also a TODO for quiet a long time. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14033) Distributed caches are not registered in Yarn Per Job Cluster Mode
Zhenqiu Huang created FLINK-14033: - Summary: Distributed caches are not registered in Yarn Per Job Cluster Mode Key: FLINK-14033 URL: https://issues.apache.org/jira/browse/FLINK-14033 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.9.0 Reporter: Zhenqiu Huang CacheFiles in StreamExecutionEnvironment is not used in Job Submission in the Yarn per job cluster mode. Compare to the job submission in session cluster mode that will upload distributed cache files onto http server in application master, we should get the cache files in job graph and register them into blob store in YarnJobClusterEntrypoint. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14034) In FlinkKafkaProducer, KafkaTransactionState should be made public or invoke should be made final
Niels van Kaam created FLINK-14034: -- Summary: In FlinkKafkaProducer, KafkaTransactionState should be made public or invoke should be made final Key: FLINK-14034 URL: https://issues.apache.org/jira/browse/FLINK-14034 Project: Flink Issue Type: Wish Components: Connectors / Kafka Affects Versions: 1.9.0 Reporter: Niels van Kaam It is not possible to override the invoke method of the FlinkKafkaProducer, because the first parameter, KafkaTransactionState, is a private inner class. It is not possible to override the original invoke of SinkFunction, because TwoPhaseCommitSinkFunction (which is implemented by FlinkKafkaProducer) does override the original invoke method with final. [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java] If there is a particular reason for this, I think it would be better to make the invoke method in FlinkKafkaProducer final as well, and document the reason such that it is clear this is by design (I don't see any overrides in the same package). Otherwise, I would make the KafkaTransactionState publicly visible. I would like to override the Invoke method to create a custom KafkaProducer which performs some additional generic validations and transformations. (which can also be done in a process-function, but a custom sink would simplify the code of jobs) -- This message was sent by Atlassian Jira (v8.3.2#803003)
[DISCUSS] modular built-in functions
Hi all, During the discussion of how to support Hive built-in functions in Flink in FLIP-57 [1], an idea of "modular built-in functions" was brought up with examples of "Extension" in Postgres [2] and "Plugin" in Presto [3]. Thus I'd like to kick off a discussion to see if we should adopt such an approach. I try to summarize basics of the idea: - functions from modules (e.g. Geo, ML) can be loaded into Flink as built-in functions - modules can be configured with order, discovered using SPI or set via code like "catalogManager.setFunctionModules(CoreFunctions, GeoFunctions, HiveFunctions)" - built-in functions from external systems, like Hive, can be packaged into such a module I took time and researched Presto Plugin and Postgres Extension, and here are some of my findings. Presto: - "Presto's Catalog associated with a connector, and a catalog only contains schemas and references a data source via a connector." [4] A Presto catalog doesn't have the concept of catalog functions, thus all Presto functions don't have namespaces. Neither does Presto have function DDL [5]. - Plugin are not specific to functions - "Plugins can provide additional Connectors, Types, Functions, and System Access Control" [6] - Thus, I feel a Plugin in Presto acts more as a "catalog" which is similar to catalogs in Flink. Since all Presto functions don't have namespaces, it probably can be seen as a built-in function module. Postgres: - Postgres extension is always installed to a schema, not the entire cluster. There's a "schema_name" param in extension creation DDL - "The name of the schema in which to install the extension's objects, given that the extension allows its contents to be relocated. The named schema must already exist. If not specified, and the extension's control file does not specify a schema either, the current default object creation schema is used." [7] Thus it also acts as "catalog" for schema, and thus functions in extension are not built-in functions to Postgres. Therefore, I feel the examples are not exactly the "built-in function modules" that were brought up, but feel free to correct me if I'm wrong. Going back to the idea itself, besides it seems to be a simpler concept and design in some ways, I have two concerns: 1. The major one is still on name resolution - how to deal with name collisions? - Not allowing duplicated name won't work for Hive built-in functions as many of them are dup named with Flink's, so we must allow modules containing same named functions to be registered - One assumption of this approach seems to be, given modules are specified in order, functions from modules can be overrode according to the order? - If so, how can users reference a function that is overrode in the above case (E.g. I may want to switch KMEANS between modules ML1 and ML2 with different implementations)? - If it's supported, it seems we still need some new syntax? - If it's not supported, that seems to be a major limitation for users 2. The minor one is, allowing built-in functions from external system to be accessed within Flink so widely can bring performance issue to users' jobs - Unlike the potential native Flink Geo or ML functions, built-in functions from external systems come with a pretty big performance penalty in Flink due to data conversions and different invocation mechanism. Supporting Hive built-in functions is mainly for simplifying migration from Hive. I'm not sure if it makes sense when a user job has nothing to do with Hive data but unintentionally ends up using Hive built-in functions without knowing it's penalized on performance. Though doc can help to some extent, not all users really read docs in detail. An alternative is to treat "function modules" as catalog. - For Flink native function modules like Geo or ML, they can be discovered and registered automatically at runtime with a predefined catalog name in itself, like "ml" or "ml1", which should be unique. Their functions are considered as built-in functions to the catalog, and can be referenced, in some new syntax like "catalog::func", as "ml:kmeans" and "ml1:kmeans". - For built-in functions from external systems (e.g. Hive), they have to be referenced either as "catalog::func" to make sure users are explicitly expecting those external functions, or as complementary built-in functions to Flink if a config "enable_hive_built_in_functions" in HiveCatalog is turned on. Either approach seems to have its own benefits, and I'm open for discussion and would like to hear others' opinions and use cases where a specific solution is required. Thanks, Bowen [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-57-Rework-FunctionCatalog-td32291.html [2] https://www.postgresql.org/docs/10/extend-extensions.html [3] https://prestodb.github.io/docs/current/develop/functions.html [4] https://prestodb.github.io/docs/current/overview/concepts.html#data-sources
[jira] [Created] (FLINK-14035) Introduce/Change some log for snapshot to better analysis checkpoint problem
Congxian Qiu(klion26) created FLINK-14035: - Summary: Introduce/Change some log for snapshot to better analysis checkpoint problem Key: FLINK-14035 URL: https://issues.apache.org/jira/browse/FLINK-14035 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.10.0 Reporter: Congxian Qiu(klion26) Currently, the information for checkpoint are mostly debug log (especially on TM side). If we want to track where the checkpoint steps and consume time during each step when we have a failed checkpoint or the checkpoint time is too long, we need to restart the job with enabling debug log, this issue wants to improve this situation, wants to change some exist debug log from debug to info, and add some more debug log. we have changed this log level in our production in Alibaba, and it seems no problem until now. Detail {{change the log below from debug level to info}} * log about \{{Starting checkpoint xxx }} in TM side * log about Sync complete in TM side * log about async compete in TM side Add debug log * log about receiving the barrier for exactly once mode - align from at lease once mode If this issue is valid, then I'm happy to contribute it. -- This message was sent by Atlassian Jira (v8.3.2#803003)
Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink
Hi Sijie, If we agree that the goal is to have Pulsar connector in 1.10, how about we do the following: 0. Start a FLIP to add Pulsar connector to Flink main repo as it is a new public interface to Flink main repo. 1. Start to review the Pulsar sink right away as there is no change to the sink interface so far. 2. Wait a little bit on FLIP-27. Flink 1.10 is going to be code freeze in late Nov and let's say we give a month to the development and review of Pulsar connector, we need to have FLIP-27 by late Oct. There are still 7 weeks. Personally I think it is doable. If FLIP-27 is not ready by late Oct, we can review and check in Pulsar connector with the existing source interface. This means we will have Pulsar connector in Flink 1.10, either with or without FLIP-27. Because we are going to have Pulsar sink and source checked in separately, it might make sense to have two FLIPs, one for Pulsar sink and another for Pulsar source. And we can start the work on Pulsar sink right away. Thanks, Jiangjie (Becket) Qin On Mon, Sep 9, 2019 at 4:13 PM Sijie Guo wrote: > Thank you Bowen and Becket. > > What's the take from Flink community? Shall we wait for FLIP-27 or shall we > proceed to next steps? And what the next steps are? :-) > > Thanks, > Sijie > > On Thu, Sep 5, 2019 at 2:43 PM Bowen Li wrote: > > > Hi, > > > > I think having a Pulsar connector in Flink can be a good mutual benefit > to > > both communities. > > > > Another perspective is that Pulsar connector is the 1st streaming > connector > > that integrates with Flink's metadata management system and Catalog APIs. > > It'll be cool to see how the integration turns out and whether we need to > > improve Flink Catalog stack, which are currently in Beta, to cater to > > streaming source/sink. Thus I'm in favor of merging Pulsar connector into > > Flink 1.10. > > > > I'd suggest to submit smaller sized PRs, e.g. maybe one for basic > > source/sink functionalities and another for schema and catalog > integration, > > just to make them easier to review. > > > > It doesn't seem to hurt to wait for FLIP-27. But I don't think FLIP-27 > > should be a blocker in cases where it cannot make its way into 1.10 or > > doesn't leave reasonable amount of time for committers to review or for > > Pulsar connector to fully adapt to new interfaces. > > > > Bowen > > > > > > > > On Thu, Sep 5, 2019 at 3:21 AM Becket Qin wrote: > > > > > Hi Till, > > > > > > You are right. It all depends on when the new source interface is going > > to > > > be ready. Personally I think it would be there in about a month or so. > > But > > > I could be too optimistic. It would also be good to hear what do > Aljoscha > > > and Stephan think as they are also involved in FLIP-27. > > > > > > In general I think we should have Pulsar connector in Flink 1.10, > > > preferably with the new source interface. We can also check it in right > > now > > > with old source interface, but I suspect few users will use it before > the > > > next official release. Therefore, it seems reasonable to wait a little > > bit > > > to see whether we can jump to the new source interface. As long as we > > make > > > sure Flink 1.10 has it, waiting a little bit doesn't seem to hurt much. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Sep 5, 2019 at 3:59 PM Till Rohrmann > > wrote: > > > > > > > Hi everyone, > > > > > > > > I'm wondering what the problem would be if we committed the Pulsar > > > > connector before the new source interface is ready. If I understood > it > > > > correctly, then we need to support the old source interface anyway > for > > > the > > > > existing connectors. By checking it in early I could see the benefit > > that > > > > our users could start using the connector earlier. Moreover, it would > > > > prevent that the Pulsar integration is being delayed in case that the > > > > source interface should be delayed. The only downside I see is the > > extra > > > > review effort and potential fixes which might be irrelevant for the > new > > > > source interface implementation. I guess it mainly depends on how > > certain > > > > we are when the new source interface will be ready. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Thu, Sep 5, 2019 at 8:56 AM Becket Qin > > wrote: > > > > > > > > > Hi Sijie and Yijie, > > > > > > > > > > Thanks for sharing your thoughts. > > > > > > > > > > Just want to have some update on FLIP-27. Although the FLIP wiki > and > > > > > discussion thread has been quiet for some time, a few committer / > > > > > contributors in Flink community were actually prototyping the > entire > > > > thing. > > > > > We have made some good progress there but want to update the FLIP > > wiki > > > > > after the entire thing is verified to work in case there are some > > last > > > > > minute surprise in the implementation. I don't have an exact ETA > yet, > > > > but I > > > > > guess it is going to be within a month or so. > > > > > > >
[jira] [Created] (FLINK-14036) function log(f0,f1) in Table API do not support decimal type
Leonard Xu created FLINK-14036: -- Summary: function log(f0,f1) in Table API do not support decimal type Key: FLINK-14036 URL: https://issues.apache.org/jira/browse/FLINK-14036 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Planner Affects Versions: 1.9.0 Reporter: Leonard Xu Fix For: 2.0.0 function log(f0,f1) in Table API module do not support decimal type, but it works in Table SQL module. The following code will run fail: {code:java} testTableApi( 'f0.log(f1), "log(f0,f1)", "2.0") override def testData: Row = { val testData = new Row(2) testData.setField(0, BigDecimal("3").bigDecimal) testData.setField(1, 9) testData } override def typeInfo: RowTypeInfo = { new RowTypeInfo( /* 0 */ fromLogicalTypeToTypeInfo(DECIMAL(1, 0)), /* 1 */ Types.INT ) }{code} The real cause is that the return type of *log()* function must be Double type,planner will cast all oprands' type to Double Type before function execution, however *org.apache.flink.table.planner.typeutils.TypeCoercion* can not yet cast Decimal type to Double type。 -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14037) Deserializing the input/output formats failed: unread block data
liupengcheng created FLINK-14037: Summary: Deserializing the input/output formats failed: unread block data Key: FLINK-14037 URL: https://issues.apache.org/jira/browse/FLINK-14037 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.9.0 Environment: flink 1.9.0 app jar use `flink-shaded-hadoop-2` dependencies to avoid some confilicts Reporter: liupengcheng Recently, we encountered the following issue when testing flink 1.9.0: {code:java} org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 8ffbc071dda81d6f8005c02be8adde6b) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:539) at com.github.ehiggs.spark.terasort.FlinkTeraSort$.main(FlinkTeraSort.scala:89) at com.github.ehiggs.spark.terasort.FlinkTeraSort.main(FlinkTeraSort.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., (JobManagerRunner.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Dat
[jira] [Created] (FLINK-14038) ExecutionGraph deploy failed due to akka timeout
liupengcheng created FLINK-14038: Summary: ExecutionGraph deploy failed due to akka timeout Key: FLINK-14038 URL: https://issues.apache.org/jira/browse/FLINK-14038 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.9.0 Environment: Flink on yarn Flink 1.9.0 Reporter: liupengcheng When launching the flink application, the following error was reported, I downloaded the operator logs, but still have no clue. The operator logs provided no useful information and was cancelled directly. JobManager logs: {code:java} java.lang.IllegalStateException: Update task on TaskManager container_e860_1567429198842_571077_01_06 @ zjy-hadoop-prc-st320.bj (dataPort=50990) failed due to: at org.apache.flink.runtime.executiongraph.Execution.lambda$sendUpdatePartitionInfoRpcCall$14(Execution.java:1395) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://fl...@zjy-hadoop-prc-st320.bj:62051/user/taskmanager_0#-171547157]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871) at akka.dispatch.OnComplete.internal(Future.scala:263) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:
Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete
Hi Jingsong, Good point! 1. If it doesn't matter which task performs the finalize work, then I think task-0 suggested by Jark is a very good solution. 2. If it requires the last finished task to perform the finalize work, then we have to consider other solutions. WRT fault-tolerant of StreamingRuntimeContext#getGlobalAggregateManager, AFAIK, there is no built-in support. 1) Regarding to TM failover, I think it's not a problem. We can use an accumulator i.e. finish_count and it is increased by 1 when a sub-task is finished(i.e. close() method is called). When finish_count == RuntimeContext.getNumberOfParallelSubtasks() for some sub-task, then we can know that it's the last finished sub-task. This holds true even in case of TM failover. 2) Regarding to JM failover, I have no idea how to work around it so far. Maybe @Jamie Grier who is the author of this feature could share more thoughts. Not sure if there is already solution/plan to support JM failover or this feature is not designed for this kind of use case? Regards, Dian > 在 2019年9月9日,下午3:08,shimin yang 写道: > > Hi Jingsong, > > Although it would be nice if the accumulators in GlobalAggregateManager is > fault-tolerant, we could still take advantage of managed state to guarantee > the semantic and use the accumulators to implement distributed barrier or > lock to solve the distributed access problem. > > Best, > Shimin > > JingsongLee 于2019年9月9日周一 下午1:33写道: > >> Thanks jark and dian: >> 1.jark's approach: do the work in task-0. Simple way. >> 2.dian's approach: use StreamingRuntimeContext#getGlobalAggregateManager >> Can do more operation. But these accumulators are not fault-tolerant? >> >> Best, >> Jingsong Lee >> >> >> -- >> From:shimin yang >> Send Time:2019年9月6日(星期五) 15:21 >> To:dev >> Subject:Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete >> >> Hi Fu, >> >> That'll be nice. >> >> Thanks. >> >> Best, >> Shimin >> >> Dian Fu 于2019年9月6日周五 下午3:17写道: >> >>> Hi Shimin, >>> >>> It can be guaranteed to be an atomic operation. This is ensured by the >> RPC >>> framework. You could take a look at RpcEndpoint for more details. >>> >>> Regards, >>> Dian >>> 在 2019年9月6日,下午2:35,shimin yang 写道: Hi Fu, Thank you for the remind. I think it would work in my case as long as >>> it's an atomic operation. Dian Fu 于2019年9月6日周五 下午2:22写道: > Hi Jingsong, > > Thanks for bring up this discussion. You can try to look at the > GlobalAggregateManager to see if it can meet your requirements. It can >>> be > got via StreamingRuntimeContext#getGlobalAggregateManager(). > > Regards, > Dian > >> 在 2019年9月6日,下午1:39,shimin yang 写道: >> >> Hi Jingsong, >> >> Big fan of this idea. We faced the same problem and resolved by >> adding >>> a >> distributed lock. It would be nice to have this feature in JobMaster, > which >> can replace the lock. >> >> Best, >> Shimin >> >> JingsongLee 于2019年9月6日周五 >> 下午12:20写道: >> >>> Hi devs: >>> >>> I try to implement streaming file sink for table[1] like > StreamingFileSink. >>> If the underlying is a HiveFormat, or a format that updates >> visibility >>> through a metaStore, I have to update the metaStore in the >>> notifyCheckpointComplete, but this operation occurs on the task >> side, >>> which will lead to distributed access to the metaStore, which will >>> lead to bottleneck. >>> >>> So I'm curious if we can support notifyOnMaster for >>> notifyCheckpointComplete like FinalizeOnMaster. >>> >>> What do you think? >>> >>> [1] >>> > >>> >> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing >>> >>> Best, >>> Jingsong Lee > > >>> >>> >>
Re: [VOTE] Release 1.8.2, release candidate #1
+1 (binding) - build from source and passed all tests locally - checked the difference between 1.8.1 and 1.8.2, no legal risk found - went through all commits checked in between 1.8.1 and 1.8.2, make sure all the issues set the proper "fixVersion" property Best, Kurt On Mon, Sep 9, 2019 at 8:45 PM Dian Fu wrote: > +1 (non-binding) > > - built from source successfully (mvn clean install -DskipTests) > - checked gpg signature and hashes of the source release and binary > release packages > - All artifacts have been deployed to the maven central repository > - no new dependencies were added since 1.8.1 > - run a couple of tests in IDE success > > Regards, > Dian > > > 在 2019年9月9日,下午2:28,jincheng sun 写道: > > > > +1 (binding) > > > > - checked signatures [SUCCESS] > > - built from source without tests [SUCCESS] > > - ran some tests in IDE [SUCCESS] > > - start local cluster and submit word count example [SUCCESS] > > - announcement PR for website looks good! (I have left a few comments) > > > > Best, > > Jincheng > > > > Jark Wu 于2019年9月6日周五 下午8:47写道: > > > >> Hi everyone, > >> > >> Please review and vote on the release candidate #1 for the version > 1.8.2, > >> as follows: > >> [ ] +1, Approve the release > >> [ ] -1, Do not approve the release (please provide specific comments) > >> > >> > >> The complete staging area is available for your review, which includes: > >> * JIRA release notes [1], > >> * the official Apache source release and binary convenience releases to > be > >> deployed to dist.apache.org [2], which are signed with the key with > >> fingerprint E2C45417BED5C104154F341085BACB5AEFAE3202 [3], > >> * all artifacts to be deployed to the Maven Central Repository [4], > >> * source code tag "release-1.8.2-rc1" [5], > >> * website pull request listing the new release and adding announcement > blog > >> post [6]. > >> > >> The vote will be open for at least 72 hours. > >> Please cast your votes before *Sep. 11th 2019, 13:00 UTC*. > >> > >> It is adopted by majority approval, with at least 3 PMC affirmative > votes. > >> > >> Thanks, > >> Jark > >> > >> [1] > >> > >> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345670 > >> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.8.2-rc1/ > >> [3] https://dist.apache.org/repos/dist/release/flink/KEYS > >> [4] > https://repository.apache.org/content/repositories/orgapacheflink-1262 > >> [5] > >> > >> > https://github.com/apache/flink/commit/6322618bb0f1b7942d86cb1b2b7bc55290d9e330 > >> [6] https://github.com/apache/flink-web/pull/262 > >> > >
Re: [DISCUSS] FLIP-63: Rework table partition support
Hi Jingsong, Thank you for bringing this discussion. Since I don't have much experience of Flink table/SQL, I'll ask some questions from runtime or engine perspective. > ... where we describe how to partition support in flink and how to integrate to hive partition. FLIP-27 [1] introduces "partition" concept officially. The changes of FLIP-27 are not only about source interface but also about the whole infrastructure. Have you ever thought how to integrate your proposal with these changes? Or you just want to support "partition" in table layer, there will be no requirement of underlying infrastructure? I have seen a discussion [2] that seems be a requirement of infrastructure to support your proposal. So I have some concerns there might be some conflicts between this proposal and FLIP-27. 1. https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface 2. http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-notifyOnMaster-for-notifyCheckpointComplete-td32769.html Thanks, Biao /'bɪ.aʊ/ On Fri, 6 Sep 2019 at 13:22, JingsongLee wrote: > Hi everyone, thank you for your comments. Mail name was updated > and streaming-related concepts were added. > > We would like to start a discussion thread on "FLIP-63: Rework table > partition support"(Design doc: [1]), where we describe how to partition > support in flink and how to integrate to hive partition. > > This FLIP addresses: >- Introduce whole story about partition support. >- Introduce and discuss DDL of partition support. >- Introduce static and dynamic partition insert. >- Introduce partition pruning >- Introduce dynamic partition implementation >- Introduce FileFormatSink to deal with streaming exactly-once and > partition-related logic. > > Details can be seen in the design document. > Looking forward to your feedbacks. Thank you. > > [1] > https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing > > Best, > Jingsong Lee