Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink
Hi, Generally +1 for a native k8s HA service. For leader election & publish leader information, there was a discussion[1] pointed out that since these two actions is NOT atomic, there will be always edge case where a previous leader overwrite leader information, even with versioned write. Versioned write helps on read again if version mismatches so if we want version write works, information in the kv pair should help the contender reflects whether it is the current leader. The idea of writes leader information on contender node or something equivalent makes sense but the details depends on how it is implemented. General problems are that 1. TM might be a bit late before it updated correct leader information but only if the leader election process is short and leadership is stable at most time, it won't be a serious issue. 2. The process TM extract leader information might be a bit more complex than directly watching a fixed key. Atomic issue can be addressed if one leverages low APIs such as lease & txn but it causes more developing efforts. ConfigMap and encapsulated interface, thought, provides only a self-consistent mechanism which doesn't promise more consistency for extension. Best, tison. [1] https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E Till Rohrmann 于2020年9月29日周二 下午9:25写道: > For 1. I was wondering whether we can't write the leader connection > information directly when trying to obtain the leadership (trying to update > the leader key with one's own value)? This might be a little detail, > though. > > 2. Alright, so we are having a similar mechanism as we have in ZooKeeper > with the ephemeral lock nodes. I guess that this complicates the > implementation a bit, unfortunately. > > 3. Wouldn't the StatefulSet solution also work without a PV? One could > configure a different persistent storage like HDFS or S3 for storing the > checkpoints and job blobs like in the ZooKeeper case. The current benefit I > see is that we avoid having to implement this multi locking mechanism in > the ConfigMaps using the annotations because we can be sure that there is > only a single leader at a time if I understood the guarantees of K8s > correctly. > > Cheers, > Till > > On Tue, Sep 29, 2020 at 8:10 AM Yang Wang wrote: > > > Hi Till, thanks for your valuable feedback. > > > > 1. Yes, leader election and storing leader information will use a same > > ConfigMap. When a contender successfully performs a versioned annotation > > update operation to the ConfigMap, it means that it has been elected as > the > > leader. And it will write the leader information in the callback of > leader > > elector[1]. The Kubernetes resource version will help us to avoid the > > leader ConfigMap is wrongly updated. > > > > 2. The lock and release is really a valid concern. Actually in current > > design, we could not guarantee that the node who tries to write his > > ownership is the real leader. Who writes later, who is the owner. To > > address this issue, we need to store all the owners of the key. Only when > > the owner is empty, the specific key(means a checkpoint or job graph) > could > > be deleted. However, we may have a residual checkpoint or job graph when > > the old JobManager crashed exceptionally and do not release the lock. To > > solve this problem completely, we need a timestamp renew mechanism > > for CompletedCheckpointStore and JobGraphStore, which could help us to > the > > check the JobManager timeout and then clean up the residual keys. > > > > 3. Frankly speaking, I am not against with this solution. However, in my > > opinion, it is more like a temporary proposal. We could use StatefulSet > to > > avoid leader election and leader retrieval. But I am not sure whether > > TaskManager could properly handle the situation that same hostname with > > different IPs, because the JobManager failed and relaunched. Also we may > > still have two JobManagers running in some corner cases(e.g. kubelet is > > down but the pod is running). Another concern is we have a strong > > dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it > > is not always true especially in self-build Kubernetes cluster. Moreover, > > PV provider should guarantee that each PV could only be mounted once. > Since > > the native HA proposal could cover all the functionality of StatefulSet > > proposal, that's why I prefer the former. > > > > > > [1]. > > > https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/ku
Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink
Thanks for your explanation. It would be fine if only checking leadership & actually write information is atomic. Best, tison. Yang Wang 于2020年9月30日周三 下午3:57写道: > Thanks till and tison for your comments. > > @Till Rohrmann > 1. I am afraid we could not do this if we are going to use fabric8 > Kubernetes client SDK for the leader election. The official Kubernetes Java > client[1] also could not support it. Unless we implement a new > LeaderElector in Flink based on the very basic Kubernetes API. But it seems > that we could gain too much from this. > > 2. Yes, the implementation will be a little complicated if we want to > completely eliminate the residual job graphs or checkpoints. Inspired by > your suggestion, another different solution has come into my mind. We could > use a same ConfigMap storing the JobManager leader, job graph, > checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for > the HA meta storage. Then it will be easier to guarantee that only the > leader could write the ConfigMap in a transactional operation. Since > “Get(check the leader)-and-Update(write back to the ConfigMap)” is a > transactional operation. > > 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However, > we still have the chances that two JobManager are running and trying to > get/delete a key in the same ConfigMap concurrently. Imagine that the > kubelet(like NodeManager in YARN) is down, and then the JobManager could > not be deleted. A new JobManager pod will be launched. We are just in the > similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit > is we do not need to implement a leader election/retrieval service. > > @tison > Actually, I do not think we will have such issue in the Kubernetes HA > service. In the Kubernetes LeaderElector[2], we have the leader information > stored on the annotation of leader ConfigMap. So it would not happen the > old leader could wrongly override the leader information. Once a JobManager > want to write his leader information to the ConfigMap, it will check > whether it is the leader now. If not, anything will happen. Moreover, the > Kubernetes Resource Version[3] ensures that no one else has snuck in and > written a different update while the client was in the process of > performing its update. > > > [1]. > https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java > [2]. > https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java > <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70> > [3]. > https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion > > > Best, > Yang > > tison 于2020年9月30日周三 下午3:21写道: > >> Hi, >> >> Generally +1 for a native k8s HA service. >> >> For leader election & publish leader information, there was a >> discussion[1] >> pointed out that since these two actions is NOT atomic, there will be >> always >> edge case where a previous leader overwrite leader information, even with >> versioned write. Versioned write helps on read again if version mismatches >> so if we want version write works, information in the kv pair should help >> the >> contender reflects whether it is the current leader. >> >> The idea of writes leader information on contender node or something >> equivalent makes sense but the details depends on how it is implemented. >> General problems are that >> >> 1. TM might be a bit late before it updated correct leader information >> but >> only if the leader election process is short and leadership is stable at >> most >> time, it won't be a serious issue. >> 2. The process TM extract leader information might be a bit more complex >> than directly watching a fixed key. >> >> Atomic issue can be addressed if one leverages low APIs such as lease & >> txn >> but it causes more developing efforts. ConfigMap and encapsulated >> interface, >> thought, provides only a self-consistent mechanism which doesn't promise >> more consistency for extension. >> >> Best, >> tison. >> >> [1] >> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E >> >> >> &g
Re: 订阅
Please send email with any content to -subscr...@flink.apache.org for subscription. For example, mailto:user-zh-subscr...@flink.apache.org to subscribe user...@flink.apache.org Best, tison. 葛春法-18667112979 于2020年10月8日周四 下午8:45写道: > I want to subscribe flink mail.
Re: (DISSCUSS) flink cli need load '--classpath' files
I think the problem is that --classpath should be before the user jar, i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar Best, tison. Aljoscha Krettek 于2020年3月6日周五 下午10:03写道: > Hi, > > first a preliminary question: does the jar file contain > com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar > here? > > Best, > Aljoscha > > On 06.03.20 13:25, ouywl wrote: > > Hi all > > When I start a flinkcluster in session mode, It include jm/tm. And > then I > > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path > a.jar’. Even > > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw exception > “ > > /opt/flink/bin/flink run --jobmanager ip:8081 --class > > com.netease.java.TopSpeedWindowing --parallelism 1 --detached > > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath > > file:///opt/flink/job/fastjson-1.2.66.jar > > Starting execution of program > > Executing TopSpeedWindowing example with default input data set. > > Use --input to specify file input. > > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON > > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > > at > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) > > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)” > >As I read the code , flink cli have not load the —classspath jar, So > It seems > > a bug about the flink cli. Are you agree with me? > > Best, > > Ouywl > > >
Re: (DISSCUSS) flink cli need load '--classpath' files
It is because as implementation when we parse command line argument it "stopAtNonOptions" at the arbitrary content user jar. All arguments later will be regarded as args passed to user main. For user serving, when you run `./bin/flink run -h`, it prints Action "run" compiles and runs a program. Syntax: run [OPTIONS] that explicit explains the format. Best, tison. tison 于2020年3月6日周五 下午10:22写道: > I think the problem is that --classpath should be before the user jar, > i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar > > Best, > tison. > > > Aljoscha Krettek 于2020年3月6日周五 下午10:03写道: > >> Hi, >> >> first a preliminary question: does the jar file contain >> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar >> here? >> >> Best, >> Aljoscha >> >> On 06.03.20 13:25, ouywl wrote: >> > Hi all >> > When I start a flinkcluster in session mode, It include jm/tm. >> And then I >> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path >> a.jar’. Even >> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw >> exception “ >> > /opt/flink/bin/flink run --jobmanager ip:8081 --class >> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached >> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath >> > file:///opt/flink/job/fastjson-1.2.66.jar >> > Starting execution of program >> > Executing TopSpeedWindowing example with default input data set. >> > Use --input to specify file input. >> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON >> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98) >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > at >> > >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > at java.lang.reflect.Method.invoke(Method.java:498) >> > at >> > >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >> > at >> > >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >> > at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) >> > at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)” >> >As I read the code , flink cli have not load the —classspath jar, So >> It seems >> > a bug about the flink cli. Are you agree with me? >> > Best, >> > Ouywl >> > >> >
Re: (DISSCUSS) flink cli need load '--classpath' files
Hi Jingsong, I think your propose is "--classpath can occur behind the jar file". Generally speaking I agree on that it is a painful required format that users tend to just ignore that order how an option occurs. So it is +1 from my side to loose the constraint. However, for the migration and implementation part, things go into a bit tricky. For user interface, let's say we only enable --classpath to occur behind the jar file, at least the semantic changes if there is a user pass --classpath intended to be a main argument. Besides, said we fix on the library commons-cli to implement the CLI, it would be a bit tricky we implement such special taken logic. Accidentally I encounter similar CLI problem recently so here are some of my thoughts about the problem, 1. I agree that for options, users tend to treat the same as [OPTIONS] and mix up the order. It would be an improvement we loss the constraint. 2. Then, we still have to introduce something that users specify their args for the main method. 3. In order to achieve 2, there is a mature solution in shell scripting that use double-dash(--) to to signify the end of command options. 4. Now, if we keep as position argument, to support mix-ordered position argument & named argument, we might switch to other library such as argparse4j since commons-cli doesn't support position argument. An alternative is we change as named argument but then we double break user interface. Though, it will break user interface so we firstly **MUST** start a discussion and see whether the community think of it and if so, how to integrate it. For me, read the doc is an easy solution to save us from breaking user interface. I don't stick to loose the constraint. Best, tison. Jingsong Li 于2020年3月6日周五 下午10:27写道: > Hi tison and Aljoscha, > > Do you think "--classpath can not be in front of jar file" is an > improvement? Or need documentation? Because I used to be confused. > > Best, > Jingsong Lee > > On Fri, Mar 6, 2020 at 10:22 PM tison wrote: > >> I think the problem is that --classpath should be before the user jar, >> i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar >> >> Best, >> tison. >> >> >> Aljoscha Krettek 于2020年3月6日周五 下午10:03写道: >> >>> Hi, >>> >>> first a preliminary question: does the jar file contain >>> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar >>> here? >>> >>> Best, >>> Aljoscha >>> >>> On 06.03.20 13:25, ouywl wrote: >>> > Hi all >>> > When I start a flinkcluster in session mode, It include jm/tm. >>> And then I >>> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path >>> a.jar’. Even >>> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw >>> exception “ >>> > /opt/flink/bin/flink run --jobmanager ip:8081 --class >>> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached >>> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath >>> > file:///opt/flink/job/fastjson-1.2.66.jar >>> > Starting execution of program >>> > Executing TopSpeedWindowing example with default input data set. >>> > Use --input to specify file input. >>> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON >>> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98) >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> > at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> > at >>> > >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> > at java.lang.reflect.Method.invoke(Method.java:498) >>> > at >>> > >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >>> > at >>> > >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >>> > at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) >>> > at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)” >>> >As I read the code , flink cli have not load the —classspath jar, >>> So It seems >>> > a bug about the flink cli. Are you agree with me? >>> > Best, >>> > Ouywl >>> > >>> >> > > -- > Best, Jingsong Lee >
Re: Flink Conf "yarn.flink-dist-jar" Question
Yes your requirement is exactly taken into consideration by the community. We currently have an open JIRA ticket for the specific feature[1] and works for loosing the constraint of flink-jar schema to support DFS location should happen. Best, tison. [1] https://issues.apache.org/jira/browse/FLINK-13938 Hailu, Andreas 于2020年3月7日周六 上午2:03写道: > Hi, > > > > We noticed that every time an application runs, it uploads the flink-dist > artifact to the /user//.flink HDFS directory. This causes a user disk > space quota issue as we submit thousands of apps to our cluster an hour. We > had a similar problem with our Spark applications where it uploaded the > Spark Assembly package for every app. Spark provides an argument to use a > location in HDFS its for applications to leverage so they don’t need to > upload them for every run, and that was our solution (see “spark.yarn.jar” > configuration if interested.) > > > > Looking at the Resource Orchestration Frameworks page > <https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#yarn-flink-dist-jar>, > I see there’s might be a similar concept through a “yarn.flink-dist-jar” > configuration option. I wanted to place the flink-dist package we’re using > in a location in HDFS and configure out jobs to point to it, e.g. > > > > yarn.flink-dist-jar: hdfs:user/delp/.flink/flink-dist_2.11-1.9.1.jar > > > > Am I correct in that this is what I’m looking for? I gave this a try with > some jobs today, and based on what I’m seeing in the launch_container.sh in > our YARN application, it still looks like it’s being uploaded: > > > > export > _FLINK_JAR_PATH="hdfs://d279536/user/delp/.flink/application_1583031705852_117863/flink-dist_2.11-1.9.1.jar" > > > > How can I confirm? Or is this perhaps not config I’m looking for? > > > > Best, > > Andreas > > -- > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices >
Re: Flink Conf "yarn.flink-dist-jar" Question
FLINK-13938 seems a bit different than your requirement. The one totally matches is FLINK-14964 <https://issues.apache.org/jira/browse/FLINK-14964>. I'll appreciate it if you can share you opinion on the JIRA ticket. Best, tison. tison 于2020年3月7日周六 上午2:35写道: > Yes your requirement is exactly taken into consideration by the community. > We currently have an open JIRA ticket for the specific feature[1] and works > for loosing the constraint of flink-jar schema to support DFS location > should happen. > > Best, > tison. > > [1] https://issues.apache.org/jira/browse/FLINK-13938 > > > Hailu, Andreas 于2020年3月7日周六 上午2:03写道: > >> Hi, >> >> >> >> We noticed that every time an application runs, it uploads the flink-dist >> artifact to the /user//.flink HDFS directory. This causes a user disk >> space quota issue as we submit thousands of apps to our cluster an hour. We >> had a similar problem with our Spark applications where it uploaded the >> Spark Assembly package for every app. Spark provides an argument to use a >> location in HDFS its for applications to leverage so they don’t need to >> upload them for every run, and that was our solution (see “spark.yarn.jar” >> configuration if interested.) >> >> >> >> Looking at the Resource Orchestration Frameworks page >> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#yarn-flink-dist-jar>, >> I see there’s might be a similar concept through a “yarn.flink-dist-jar” >> configuration option. I wanted to place the flink-dist package we’re using >> in a location in HDFS and configure out jobs to point to it, e.g. >> >> >> >> yarn.flink-dist-jar: hdfs:user/delp/.flink/flink-dist_2.11-1.9.1.jar >> >> >> >> Am I correct in that this is what I’m looking for? I gave this a try with >> some jobs today, and based on what I’m seeing in the launch_container.sh in >> our YARN application, it still looks like it’s being uploaded: >> >> >> >> export >> _FLINK_JAR_PATH="hdfs://d279536/user/delp/.flink/application_1583031705852_117863/flink-dist_2.11-1.9.1.jar" >> >> >> >> How can I confirm? Or is this perhaps not config I’m looking for? >> >> >> >> Best, >> >> Andreas >> >> -- >> >> Your Personal Data: We may collect and process information about you that >> may be subject to data protection laws. For more information about how we >> use and disclose your personal data, how we protect your information, our >> legal basis to use your information, your rights and who you can contact, >> please refer to: www.gs.com/privacy-notices >> >
Re: Flink 1.10 StopWithSavepoint only suit for the sources that implement the StoppableFunction interface?
The StoppableFunction is gone. See also https://issues.apache.org/jira/browse/FLINK-11889 Best, tison. LakeShen 于2020年3月12日周四 下午5:44写道: > Hi community, > now I am seeing the FLIP-45 , as I see the stop command only suit > for the sources that implement the StoppableFunction interface. > So I have a question is that if I use StopWithSavepoint command to > stop my flink task , just like './flink stop -p xxx ...', this command > only suit for the sources that implement the StoppableFunction interface, > is it correct? > Thanks to your reply. > > Best wishes, > LakeShen >
Re: How to change the flink web-ui jobServer?
IIRC Flink on Kubernetes doesn't support configure rest port as port range. Maybe Yang(in cc) can give more information and if so, our current logic only take care of RestOptions.PORT but not RestOptions.BIND_PORT, which will be a bug. Best, tison. LakeShen 于2020年3月15日周日 上午11:25写道: > Ok, thanks! Arvid > > Arvid Heise 于2020年3月10日周二 下午4:14写道: > >> Hi LakeShen, >> >> you can change the port with >> >> conf.setInteger(RestOptions.PORT, 8082); >> >> or if want to be on the safe side specify a range >> >> conf.setString(RestOptions.BIND_PORT, "8081-8099"); >> >> >> On Mon, Mar 9, 2020 at 10:47 AM LakeShen >> wrote: >> >>> Hi community, >>>now I am moving the flink job to k8s,and I plan to use the >>> ingress to show the flink web ui , the problem is that fink job server >>> aren't correct, so I want to change the flink web-ui jobserver ,I don't >>> find the any method to change it ,are there some method to do that? >>>Thanks to your reply. >>> >>> Best wishes, >>> LakeShen >>> >>
Re: 最新代码编译问题
从 flink/ 根目录运行 mvn clean install -DskipTests 你这个问题是因为 impl 那些类是生成类,一般来说从根目录运行一次全量编译可以解决各种疑难杂症 Best, tison. 吴志勇 <1154365...@qq.com> 于2020年3月16日周一 下午4:23写道: > 您好, > 我从github上下载了最新的代码。在IDEA中尝试编译,但是flink-table项目flink-sql-parser编译报错, > > test中也同样报错, > > 请问该如何解决呢?flink-sql-parser像是缺少了impl包呀。 >
Re: 最新代码编译问题
Hi, You'd better use English in user mailing list. If you prefer Chinese, you can post the email to user...@flink.apache.org . Best, tison. tison 于2020年3月16日周一 下午4:25写道: > 从 flink/ 根目录运行 mvn clean install -DskipTests > > 你这个问题是因为 impl 那些类是生成类,一般来说从根目录运行一次全量编译可以解决各种疑难杂症 > > Best, > tison. > > > 吴志勇 <1154365...@qq.com> 于2020年3月16日周一 下午4:23写道: > >> 您好, >> 我从github上下载了最新的代码。在IDEA中尝试编译,但是flink-table项目flink-sql-parser编译报错, >> >> test中也同样报错, >> >> 请问该如何解决呢?flink-sql-parser像是缺少了impl包呀。 >> >
Re: Flink YARN app terminated before the client receives the result
Hi Weike & Till, I agree with Till and it is also the analysis from my side. However, it seems even if we don't have FLINK-15116, it is still possible that we complete the cancel future but the cluster got shutdown before it properly delivered the response. There is one thing strange that this behavior almost reproducible, it should be a possible order but not always. Maybe previous we have to firstly cancel the job which has a long call chain so that it happens we have enough time to delivered the response. But the resolution looks like we introduce some synchronization/finalization logics that clear these outstanding future with best effort before the cluster(RestServer) down. Best, tison. Till Rohrmann 于2020年3月17日周二 上午4:12写道: > Hi Weike, > > could you share the complete logs with us? Attachments are being filtered > out by the Apache mail server but it works if you upload the logs somewhere > (e.g. https://gist.github.com/) and then share the link with us. Ideally > you run the cluster with DEBUG log settings. > > I assume that you are running Flink 1.10, right? > > My suspicion is that this behaviour has been introduced with FLINK-15116 > [1]. It looks as if we complete the shutdown future in > MiniDispatcher#cancelJob before we return the response to the > RestClusterClient. My guess is that this triggers the shutdown of the > RestServer which then is not able to serve the response to the client. I'm > pulling in Aljoscha and Tison who introduced this change. They might be > able to verify my theory and propose a solution for it. > > [1] https://issues.apache.org/jira/browse/FLINK-15116 > > Cheers, > Till > > On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike > wrote: > >> Hi Yangze and all, >> >> I have tried numerous times, and this behavior persists. >> >> Below is the tail log of taskmanager.log: >> >> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot >> TaskSlot(index:0, state:ACTIVE, resource profile: >> ResourceProfile{cpuCores=1., taskHeapMemory=1.503gb >> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb >> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)}, >> allocationId: d3acaeac3db62454742e800b5410adfd, jobId: >> d0a674795be98bd2574d9ea3286801cb). >> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove job >> d0a674795be98bd2574d9ea3286801cb from job leader monitoring. >> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager >> connection for job d0a674795be98bd2574d9ea3286801cb. >> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager >> connection for job d0a674795be98bd2574d9ea3286801cb. >> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot reconnect >> to job d0a674795be98bd2574d9ea3286801cb because it is not registered. >> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO >> org.apache.flink.yarn.YarnTaskExecutorRunner - RECEIVED SIGNAL 15: >> SIGTERM. Shutting down as requested. >> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO >> org.apache.flink.yarn.YarnTaskExecutorRunner - RECEIVED SIGNAL 15: >> SIGTERM. Shutting down as requested. >> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO >> org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB >> cache >> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown >> hook] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl - >> FileChannelManager removed spill file directory >> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d >> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown >> hook] INFO >> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - >> Shutting down TaskExecutorLocalStateStoresManager. >> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO >> org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB >> cache >> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO >> org.apache.flink.runtime.io.disk.FileChannelManagerImpl - >> FileChannelManager removed spill file directory >> /data/emr/yarn/local/usercache/hadoo
Re: Flink YARN app terminated before the client receives the result
edit: previously after the cancellation we have a longer call chain to #jobReachedGloballyTerminalState which does the archive job & JM graceful showdown, which might take some time so that ... Best, tison. tison 于2020年3月17日周二 上午10:13写道: > Hi Weike & Till, > > I agree with Till and it is also the analysis from my side. However, it > seems even if we don't have FLINK-15116, it is still possible that we > complete the cancel future but the cluster got shutdown before it properly > delivered the response. > > There is one thing strange that this behavior almost reproducible, it > should be a possible order but not always. Maybe previous we have to > firstly cancel the job which has a long call chain so that it happens we > have enough time to delivered the response. > > But the resolution looks like we introduce some > synchronization/finalization logics that clear these outstanding future > with best effort before the cluster(RestServer) down. > > Best, > tison. > > > Till Rohrmann 于2020年3月17日周二 上午4:12写道: > >> Hi Weike, >> >> could you share the complete logs with us? Attachments are being filtered >> out by the Apache mail server but it works if you upload the logs somewhere >> (e.g. https://gist.github.com/) and then share the link with us. Ideally >> you run the cluster with DEBUG log settings. >> >> I assume that you are running Flink 1.10, right? >> >> My suspicion is that this behaviour has been introduced with FLINK-15116 >> [1]. It looks as if we complete the shutdown future in >> MiniDispatcher#cancelJob before we return the response to the >> RestClusterClient. My guess is that this triggers the shutdown of the >> RestServer which then is not able to serve the response to the client. I'm >> pulling in Aljoscha and Tison who introduced this change. They might be >> able to verify my theory and propose a solution for it. >> >> [1] https://issues.apache.org/jira/browse/FLINK-15116 >> >> Cheers, >> Till >> >> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike >> wrote: >> >>> Hi Yangze and all, >>> >>> I have tried numerous times, and this behavior persists. >>> >>> Below is the tail log of taskmanager.log: >>> >>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO >>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot >>> TaskSlot(index:0, state:ACTIVE, resource profile: >>> ResourceProfile{cpuCores=1., taskHeapMemory=1.503gb >>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb >>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)}, >>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId: >>> d0a674795be98bd2574d9ea3286801cb). >>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO >>> org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove job >>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring. >>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager >>> connection for job d0a674795be98bd2574d9ea3286801cb. >>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager >>> connection for job d0a674795be98bd2574d9ea3286801cb. >>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO >>> org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot reconnect >>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered. >>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO >>> org.apache.flink.yarn.YarnTaskExecutorRunner - RECEIVED SIGNAL 15: >>> SIGTERM. Shutting down as requested. >>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO >>> org.apache.flink.yarn.YarnTaskExecutorRunner - RECEIVED SIGNAL 15: >>> SIGTERM. Shutting down as requested. >>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO >>> org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB >>> cache >>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown >>> hook] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl - >>> FileChannelManager removed spill file directory >>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d >>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStores
Re: Flink YARN app terminated before the client receives the result
JIRA created as https://jira.apache.org/jira/browse/FLINK-16637 Best, tison. Till Rohrmann 于2020年3月17日周二 下午5:57写道: > @Tison could you create an issue to track the problem. Please also link > the uploaded log file for further debugging. > > I think the reason why it worked in Flink 1.9 could have been that we had > a async callback in the longer chain which broke the flow of execution and > allowed to send the response. This is no longer the case. As an easy fix > one could change thenAccept into thenAcceptAsync in the > MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should > think about allowing not only StatusHandler to close asynchronously. At the > moment we say that all other handler shut down immediately (see > AbstractHandler#closeHandlerAsync). But the problem with this change would > be that all handler would become stateful because they would need to > remember whether a request is currently ongoing or not. > > Cheers, > Till > > On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike > wrote: > >> Hi Tison & Till and all, >> >> I have uploaded the client, taskmanager and jobmanager log to Gist ( >> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I >> can reproduce this bug every time when trying to cancel Flink 1.10 jobs on >> YARN. >> >> Besides, in earlier Flink versions like 1.9, the REST API for *cancelling >> job with a savepoint *sometimes throws exceptions to the client side due >> to >> early shutdown of the server, even though the savepoint was successfully >> completed by reviewing the log, however when using the newly introduced >> *stop* API, that bug disappeared, however, *cancel* API seems to be buggy >> now. >> >> Best, >> Weike >> >> On Tue, Mar 17, 2020 at 10:17 AM tison wrote: >> >> > edit: previously after the cancellation we have a longer call chain to >> > #jobReachedGloballyTerminalState which does the archive job & JM >> graceful >> > showdown, which might take some time so that ... >> > >> > Best, >> > tison. >> > >> > >> > tison 于2020年3月17日周二 上午10:13写道: >> > >> >> Hi Weike & Till, >> >> >> >> I agree with Till and it is also the analysis from my side. However, it >> >> seems even if we don't have FLINK-15116, it is still possible that we >> >> complete the cancel future but the cluster got shutdown before it >> properly >> >> delivered the response. >> >> >> >> There is one thing strange that this behavior almost reproducible, it >> >> should be a possible order but not always. Maybe previous we have to >> >> firstly cancel the job which has a long call chain so that it happens >> we >> >> have enough time to delivered the response. >> >> >> >> But the resolution looks like we introduce some >> >> synchronization/finalization logics that clear these outstanding future >> >> with best effort before the cluster(RestServer) down. >> >> >> >> Best, >> >> tison. >> >> >> >> >> >> Till Rohrmann 于2020年3月17日周二 上午4:12写道: >> >> >> >>> Hi Weike, >> >>> >> >>> could you share the complete logs with us? Attachments are being >> >>> filtered out by the Apache mail server but it works if you upload the >> logs >> >>> somewhere (e.g. https://gist.github.com/) and then share the link >> with >> >>> us. Ideally you run the cluster with DEBUG log settings. >> >>> >> >>> I assume that you are running Flink 1.10, right? >> >>> >> >>> My suspicion is that this behaviour has been introduced with >> FLINK-15116 >> >>> [1]. It looks as if we complete the shutdown future in >> >>> MiniDispatcher#cancelJob before we return the response to the >> >>> RestClusterClient. My guess is that this triggers the shutdown of the >> >>> RestServer which then is not able to serve the response to the >> client. I'm >> >>> pulling in Aljoscha and Tison who introduced this change. They might >> be >> >>> able to verify my theory and propose a solution for it. >> >>> >> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116 >> >>> >> >>> Cheers, >> >>> Till >> >>> >> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike >> >
Re: JobMaster does not register with ResourceManager in high availability setup
Hi Abhinav, The problem is Curator: Background operation retry gave up So it is the ZK ensemble too unstable to get recovery in time so that Curator stopped retrying and threw a fatal error. Best, tison. Xintong Song 于2020年3月18日周三 上午10:22写道: > I'm not familiar with ZK either. > > I've copied Yang Wang, who might be able to provide some suggestions. > > Alternatively, you can try to post your question to the Apache ZooKeeper > community, see if they have any clue. > > Thank you~ > > Xintong Song > > > > On Wed, Mar 18, 2020 at 8:12 AM Bajaj, Abhinav > wrote: > >> Hi Xintong, >> >> >> >> I did check the Zk logs and didn’t notice anything interesting. >> >> I have limited expertise in zookeeper. >> >> Can you share an example of what I should be looking for in Zk? >> >> >> >> I was able to reproduce this issue again with Flink 1.7 by killing the >> zookeeper leader that disrupted the quorum. >> >> The sequence of logs in this case look quite similar to one we have been >> discussing. >> >> >> >> If the code hasn’t changed in this area till 1.10 then maybe the latest >> version also has the potential issue. >> >> >> >> Its not straightforward to bump up the Flink version in the >> infrastructure available to me. >> >> But I will think if there is a way around it. >> >> >> >> ~ Abhinav Bajaj >> >> >> >> *From: *Xintong Song >> *Date: *Monday, March 16, 2020 at 8:00 PM >> *To: *"Bajaj, Abhinav" >> *Cc: *"user@flink.apache.org" >> *Subject: *Re: JobMaster does not register with ResourceManager in high >> availability setup >> >> >> >> Hi Abhinav, >> >> >> >> I think you are right. The log confirms that JobMaster has not tried to >> connect ResourceManager. Most likely the JobMaster requested for RM address >> but has never received it. >> >> >> >> I would suggest you to check the ZK logs, see if the request form JM for >> RM address has been received and properly responded. >> >> >> >> If you can easily reproduce this problem, and you are able to build Flink >> from source, you can also try to insert more logs in Flink to further >> confirm whether the RM address is received. I don't think that's necessary >> though, since those codes have not been changed since Flink 1.7 till the >> latest 1.10, and I'm not aware of any reported issue that the JM may not >> try to connect RM once the address is received. >> >> >> >> Thank you~ >> >> Xintong Song >> >> >> >> >> >> On Tue, Mar 17, 2020 at 7:45 AM Bajaj, Abhinav >> wrote: >> >> Hi Xintong, >> >> >> >> Apologies for delayed response. I was away for a week. >> >> I am attaching more jobmanager logs. >> >> >> >> To your point on the taskmanagers, the job is deployed with 20 >> parallelism but it has 22 TMs to have 2 of them as spare to assist in quick >> failover. >> >> I did check the logs and all 22 of task executors from those TMs get >> registered by the time - 2020-02-27 06:35:47.050. >> >> >> >> You would notice that even after this time, the job fails with the error >> “org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >> Could not allocate all requires slots within timeout of 30 ms. Slots >> required: 201, slots allocated: 0” at 2020-02-27 06:40:36.778. >> >> >> >> Thanks a ton for you help. >> >> >> >> ~ Abhinav Bajaj >> >> >> >> *From: *Xintong Song >> *Date: *Thursday, March 5, 2020 at 6:30 PM >> *To: *"Bajaj, Abhinav" >> *Cc: *"user@flink.apache.org" >> *Subject: *Re: JobMaster does not register with ResourceManager in high >> availability setup >> >> >> >> Hi Abhinav, >> >> >> >> Thanks for the log. However, the attached log seems to be incomplete. >> The NoResourceAvailableException cannot be found in this log. >> >> >> >> Regarding connecting to ResourceManager, the log suggests that: >> >>- ZK was back to life and connected at 06:29:56. >>2020-02-27 06:29:56.539 [main-EventThread] level=INFO >> o.a.f.s.c.o.a.curator.framework.state.ConnectionStateManager - State >>change: CONNECTED >>- RM registered to ZK and was granted leadership at 06:30:01. >&g
Re: JobMaster does not register with ResourceManager in high availability setup
Sorry I mixed up the log, it belongs to previous failure. Could you trying to reproduce the problem with DEBUG level log? >From the log we knew that JM & RM had been elected as leader but the listener didn't work. However, we didn't know it is because the leader didn't publish the leader info or the listener didn't get notified. Best, tison. tison 于2020年3月18日周三 上午10:40写道: > Hi Abhinav, > > The problem is > > Curator: Background operation retry gave up > > So it is the ZK ensemble too unstable to get recovery in time so that > Curator stopped retrying and threw a fatal error. > > Best, > tison. > > > Xintong Song 于2020年3月18日周三 上午10:22写道: > >> I'm not familiar with ZK either. >> >> I've copied Yang Wang, who might be able to provide some suggestions. >> >> Alternatively, you can try to post your question to the Apache ZooKeeper >> community, see if they have any clue. >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Wed, Mar 18, 2020 at 8:12 AM Bajaj, Abhinav >> wrote: >> >>> Hi Xintong, >>> >>> >>> >>> I did check the Zk logs and didn’t notice anything interesting. >>> >>> I have limited expertise in zookeeper. >>> >>> Can you share an example of what I should be looking for in Zk? >>> >>> >>> >>> I was able to reproduce this issue again with Flink 1.7 by killing the >>> zookeeper leader that disrupted the quorum. >>> >>> The sequence of logs in this case look quite similar to one we have been >>> discussing. >>> >>> >>> >>> If the code hasn’t changed in this area till 1.10 then maybe the latest >>> version also has the potential issue. >>> >>> >>> >>> Its not straightforward to bump up the Flink version in the >>> infrastructure available to me. >>> >>> But I will think if there is a way around it. >>> >>> >>> >>> ~ Abhinav Bajaj >>> >>> >>> >>> *From: *Xintong Song >>> *Date: *Monday, March 16, 2020 at 8:00 PM >>> *To: *"Bajaj, Abhinav" >>> *Cc: *"user@flink.apache.org" >>> *Subject: *Re: JobMaster does not register with ResourceManager in high >>> availability setup >>> >>> >>> >>> Hi Abhinav, >>> >>> >>> >>> I think you are right. The log confirms that JobMaster has not tried to >>> connect ResourceManager. Most likely the JobMaster requested for RM address >>> but has never received it. >>> >>> >>> >>> I would suggest you to check the ZK logs, see if the request form JM for >>> RM address has been received and properly responded. >>> >>> >>> >>> If you can easily reproduce this problem, and you are able to build >>> Flink from source, you can also try to insert more logs in Flink to further >>> confirm whether the RM address is received. I don't think that's necessary >>> though, since those codes have not been changed since Flink 1.7 till the >>> latest 1.10, and I'm not aware of any reported issue that the JM may not >>> try to connect RM once the address is received. >>> >>> >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> >>> >>> On Tue, Mar 17, 2020 at 7:45 AM Bajaj, Abhinav >>> wrote: >>> >>> Hi Xintong, >>> >>> >>> >>> Apologies for delayed response. I was away for a week. >>> >>> I am attaching more jobmanager logs. >>> >>> >>> >>> To your point on the taskmanagers, the job is deployed with 20 >>> parallelism but it has 22 TMs to have 2 of them as spare to assist in quick >>> failover. >>> >>> I did check the logs and all 22 of task executors from those TMs get >>> registered by the time - 2020-02-27 06:35:47.050. >>> >>> >>> >>> You would notice that even after this time, the job fails with the error >>> “org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >>> Could not allocate all requires slots within timeout of 30 ms. Slots >>> required: 201, slots allocated: 0” at 2020-02-27 06:40:36.778. >>> >>> >>> >>> Thanks a ton for you help. >>> >>> >>> >>> ~ A
Re: Windows on SinkFunctions
Hi Sidney, For the case, you can exactly write stream. ... .window() .apply() .addSink() Operator chain will chain these operators into one so that you don't have to worry about the efficiency. Best, tison. Sidney Feiner 于2020年3月22日周日 下午10:03写道: > Hey, > I wanted to know if it's possible to define a SinkFunction as a > WindowFunction as well. > For example, I would like the sink to be invoked every 5 minute or once > 500 events reached the sink. > Is there a way to do this inside the sink implementation? Or do I have to > create the windows prior in the pipeline? > Because if I have multiple sinks that that only for one of them I need a > Window, the second solution might be problematic. > > Thanks :) > > *Sidney Feiner* */* Data Platform Developer > M: +972.528197720 */* Skype: sidney.feiner.startapp > > [image: emailsignature] > >
Re: JobMaster does not register with ResourceManager in high availability setup
Hi, It seems the leader info has been published but since you don't turn on DEBUG log on org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService still we can only *guess* the retrieval service in JobMaster doesn't get notified and even I don't see a INFO level log Starting ZooKeeperLeaderRetrievalService ... so that I suspect whether the retrieval service normally started. Best, tison. Bajaj, Abhinav 于2020年3月23日周一 下午1:55写道: > Hi Yang, Tison, > > > > I think I was to reproduce the issue with a simpler job with DEBUG logs > enabled on below classes – > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService > > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager > > org.apache.flink.runtime.dispatcher.StandaloneDispatcher > > org.apache.flink.runtime.jobmaster.JobManagerRunner > > org.apache.flink.runtime.jobmaster.JobMaster > > > > I have attached the logs. > > > > I highly appreciate the help. > > > > ~ Abhinav Bajaj > > > > *From: *Yang Wang > *Date: *Wednesday, March 18, 2020 at 12:14 AM > *To: *tison > *Cc: *Xintong Song , "Bajaj, Abhinav" < > abhinav.ba...@here.com>, "user@flink.apache.org" > *Subject: *Re: JobMaster does not register with ResourceManager in high > availability setup > > > > It seems that your zookeeper service is not stable. From the the log i > find that resourcemanager > > leader is granted and taskmanager could register to resourcemanager > successfully. That means > > the resourcemanager address has been published to the ZK successfully. > > > > Also a ZooKeeperLeaderRetrievalService has been started successfully for > the new started > > jobmaster. However, the ZK listener did not get notified for the new > resourcemanager leader. > > So the jobmaster could not allocate resource from resourcemanager and > failed with "NoResourceAvailableException". > > > > Just like tison said, i think you need to provide the jobmanager log with > DEBUG level. Or try > > to make the ZK service as stable as possible. > > > > > > Best, > > Yang > > > > tison 于2020年3月18日周三 上午11:20写道: > > Sorry I mixed up the log, it belongs to previous failure. > > > > Could you trying to reproduce the problem with DEBUG level log? > > > > From the log we knew that JM & RM had been elected as leader but the > listener didn't work. However, we didn't know it is because the leader > didn't publish the leader info or the listener didn't get notified. > > > > Best, > > tison. > > > > > > tison 于2020年3月18日周三 上午10:40写道: > > Hi Abhinav, > > > > The problem is > > > > Curator: Background operation retry gave up > > > > So it is the ZK ensemble too unstable to get recovery in time so that > Curator stopped retrying and threw a fatal error. > > > > Best, > > tison. > > > > > > Xintong Song 于2020年3月18日周三 上午10:22写道: > > I'm not familiar with ZK either. > > > > I've copied Yang Wang, who might be able to provide some suggestions. > > > > Alternatively, you can try to post your question to the Apache ZooKeeper > community, see if they have any clue. > > > Thank you~ > > Xintong Song > > > > > > On Wed, Mar 18, 2020 at 8:12 AM Bajaj, Abhinav > wrote: > > Hi Xintong, > > > > I did check the Zk logs and didn’t notice anything interesting. > > I have limited expertise in zookeeper. > > Can you share an example of what I should be looking for in Zk? > > > > I was able to reproduce this issue again with Flink 1.7 by killing the > zookeeper leader that disrupted the quorum. > > The sequence of logs in this case look quite similar to one we have been > discussing. > > > > If the code hasn’t changed in this area till 1.10 then maybe the latest > version also has the potential issue. > > > > Its not straightforward to bump up the Flink version in the infrastructure > available to me. > > But I will think if there is a way around it. > > > > ~ Abhinav Bajaj > > > > *From: *Xintong Song > *Date: *Monday, March 16, 2020 at 8:00 PM > *To: *"Bajaj, Abhinav" > *Cc: *"user@flink.apache.org" > *Subject: *Re: JobMaster does not register with ResourceManager in high > availability setup > > > > Hi Abhinav, > > > > I think you are right. The log co
Javadocs Broken?
Hi guys, Right now when I click "JavaDocs" in out docsite[1] it jumps to a page[2] I think is definitely not out api documentation. Any thoughts? Best, tison. [1] https://ci.apache.org/projects/flink/flink-docs-master/ [2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/
Re: Javadocs Broken?
s/out/our/
Schema with TypeInformation or DataType
Hi, I notice that our type system has two branches. One is TypeInformation while the other is DataType. It is said that Table API will use DataType but there are several questions about this statement: 1. Will TypeInformation be deprecated and we use DataType as type system everywhere? 2. Schema in Table API currently support only TypeInformation to register a field, shall we support the DataType way as well? Best, tison.
Re: Schema with TypeInformation or DataType
Thanks for your inputs and sorry that I said Schema doesn't support DataType to register a field because I was looking into Flink 1.9 codes... Best, tison. Jark Wu 于2020年4月17日周五 下午2:42写道: > Hi Tison, > > Migration from TypeInformation to DataType is a large work and will across > many releases. As far as I can tell, we will finalize the work in 1.11. > As godfrey said above, Flink SQL & Table API should always use DataType, > DataStream uses TypeInformation. > > Schema already supports DataType to register a field, and the the method > using TypeInformation to register field is deprecated since 1.10. > > Best, > Jark > > On Fri, 17 Apr 2020 at 14:14, tison wrote: > >> Hi, >> >> I notice that our type system has two branches. One is TypeInformation >> while the other is >> DataType. It is said that Table API will use DataType but there are >> several questions about >> this statement: >> >> 1. Will TypeInformation be deprecated and we use DataType as type system >> everywhere? >> 2. Schema in Table API currently support only TypeInformation to register >> a field, shall we support >> the DataType way as well? >> >> Best, >> tison. >> >
Re: Flink Conf "yarn.flink-dist-jar" Question
Hi Yang, I agree that these two of works would benefit from single assignee. My concern is as below 1. Both share libs & remote flink dist/libs are remote ship files. I don't think we have to implement multiple codepath/configuration. 2. So, for concept clarification, there are (1) an option to disable shipping local libs (2) flink-dist supports multiple schema at least we said "hdfs://" (3) an option for registering remote shipfiles with path & visibility. I think new configuration system helps. the reason we have to special handling (2) instead of including it in (3) is because when shipping flink-dist to TM container, we specially detect flink-dist. Of course we can merge it into general ship files and validate shipfiles finally contain flink-dist, which is an alternative. The *most important* difference is (1) and (3) which we don't have an option for only remote libs. Is this clarification satisfy your proposal? Best, tison. Till Rohrmann 于2020年4月17日周五 下午7:49写道: > Hi Yang, > > from what I understand it sounds reasonable to me. Could you sync with > Tison on FLINK-14964 on how to proceed. I'm not super deep into these > issues but they seem to be somewhat related and Tison already did some > implementation work. > > I'd say it be awesome if we could include this kind of improvement into > the release. > > Cheers, > Till > > On Thu, Apr 16, 2020 at 4:43 AM Yang Wang wrote: > >> Hi All, thanks a lot for reviving this discussion. >> >> I think we could unify the FLINK-13938 and FLINK-14964 since they have >> the similar >> purpose, avoid unnecessary uploading and downloading jars in YARN >> deployment. >> The difference is FLINK-13938 aims to support the flink system lib >> directory only, while >> FLINK-14964 is trying to support arbitrary pre-uloaded jars(including >> user and system jars). >> >> >> So i suggest to do this feature as following. >> 1. Upload the flink lib directory or users to hdfs, e.g. >> "hdfs://hdpdev/flink/release/flink-1.x" >> "hdfs://hdpdev/user/someone/mylib" >> 2. Use the -ypl argument to specify the shared lib, multiple directories >> could be specified >> 3. YarnClusterDescriptor will use the pre-uploaded jars to avoid >> unnecessary uploading, >> both for system and user jars >> 4. YarnClusterDescriptor needs to set the system jars to public >> visibility so that the distributed >> cache in the YARN nodemanager could be reused by multiple applications. >> This is to avoid >> unnecessary downloading, especially for the "flink-dist-*.jar". For the >> user shared lib, the >> visibility is still set to "APPLICATION" level. >> >> >> For our past internal use case, the shared lib could help with >> accelerating the submission a lot. >> Also it helps to reduce the pressure of HDFS when we want to launch many >> applications together. >> >> @tison @Till Rohrmann @Hailu, Andreas >> If you guys thinks the suggestion makes sense. I >> will try to find some time to work on this and hope it could catch up >> with release-1.1 cycle. >> >> >> Best, >> Yang >> >> Hailu, Andreas [Engineering] 于2020年4月16日周四 >> 上午8:47写道: >> >>> Okay, I’ll continue to watch the JIRAs. Thanks for the update, Till. >>> >>> >>> >>> *// *ah >>> >>> >>> >>> *From:* Till Rohrmann >>> *Sent:* Wednesday, April 15, 2020 10:51 AM >>> *To:* Hailu, Andreas [Engineering] >>> *Cc:* Yang Wang ; tison ; >>> user@flink.apache.org >>> *Subject:* Re: Flink Conf "yarn.flink-dist-jar" Question >>> >>> >>> >>> Hi Andreas, >>> >>> >>> >>> it looks as if FLINK-13938 and FLINK-14964 won't make it into the 1.10.1 >>> release because the community is about to start the release process. Since >>> FLINK-13938 is a new feature it will be shipped with a major release. There >>> is still a bit of time until the 1.11 feature freeze and if Yang Wang has >>> time to finish this PR, then we could ship it. >>> >>> >>> >>> Cheers, >>> >>> Till >>> >>> >>> >>> On Wed, Apr 15, 2020 at 3:23 PM Hailu, Andreas [Engineering] < >>> andreas.ha...@gs.com> wrote: >>> >>> Yang, Tison, >>> >>> >>> >>> Do we know when some solution for 13938 and 14964 will arrive? Do you >>> think it will be in a 1.10.x version? >>> >>&
Re: Can I use Joda-Time in Flink?
Hi Alexander, What do you mean exactly? Could you describe it in pseudo code? I'm not quite sure where Java-Time used in env. Best, tison. Alexander Borgschulze 于2020年4月17日周五 下午9:21写道: > Can I use Joda-Time instead of Java-Time and set it up in the > StreamExecutionEnvironment? >
Re: multi-sql checkpoint fail
Hi, Could you share the stack traces? Best, tison. forideal 于2020年4月18日周六 上午12:33写道: > Hello friend > I have two SQL, checkpoint fails all the time. One task is to open a > sliding window for an hour, and then another task consumes the output data > of the previous task. There will be no problem with the two tasks submitted > separately. > > -- first Calculation-- second Write the calculation to redis-- firstinsert > into > dw_access_logselect > time_key, > query_nor, > query_nor_counter, > '1' as group_keyfrom( > select > HOP_START( > event_time_fake, > interval '1' MINUTE, > interval '60' MINUTE > ) as time_key, > query_nor, > count(1) as query_nor_counter > from( > select > RED_JSON_VALUE(request, '$.query_nor') as query_nor, > RED_JSON_VALUE(request, '$.target') as target, > event_time_fake > from > ( > select > red_pb_parser(body, 'request') as request, > event_time_fake > from > access_log_source > ) > ) > group by > query_nor, > HOP( -- sliding window size one hour, step one minute > event_time_fake, > interval '1' MINUTE, > interval '60' MINUTE > ) > )where > query_nor_counter > 100; > -- secondinsert into > dw_sink_access_logselect > 'fix_key' as `key`, > get_json_value(query_nor, query_nor_counter) as `value` -- agg_funcfrom > dw_access_loggroup by > tumble (time_key_fake, interval '1' MINUTE), > group_key > > Article Link:https://zhuanlan.zhihu.com/p/132764573 > Picture Link: > https://pic4.zhimg.com/80/v2-d3b1105b1419fef3ea6b9176085a5597_1440w.jpg > https://pic3.zhimg.com/80/v2-b6ea7b4a8368c4bae03afb94c723bcca_1440w.jpg > > Best, forideal > > > > > >
Re: Akka Error
If you run a program using "flink run" in dist/bin, dependencies should be taken care of. Could you describe detailedly how you "start a flink program"? Did you write an entrypoint, compile it and run by "java YouProgram"? If so, you should configure classpath by yourself. Best, tison. Alexander Borgschulze 于2020年4月18日周六 上午3:03写道: > When I try to start a flink program, I get the following exception: > > com.typesafe.config.ConfigException$Missing: No configuration setting > found for key 'akka.version' > at > com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176) > > > Do I need to change some files and add a new key-value pair? >
Re: Flink Conf "yarn.flink-dist-jar" Question
Hi Yang, Name filtering & schema special handling makes sense for me. We can enrich later if there is requirement without breaking interface. For #1, from my perspective your first proposal is having an option specifies remote flink/lib, then we turn off auto uploading local flink/lib and register that path as local resources It seems we here add another special logic for handling one kind of things...what I propose is we do these two steps explicitly separated: 1. an option turns off auto uploading local flink/lib 2. a general option register remote files as local resources The rest thing here is that you propose we handle flink/lib as PUBLIC visibility while other files as APPLICATION visibility, whether a composite configuration or name filtering to special handle libs makes sense though. YarnClusterDescriptor already has a lot of special handling logics which introduce a number of config options and keys, which should have been configured in few of common options and validated at the runtime. Best, tison. Yang Wang 于2020年4月17日周五 下午11:42写道: > Hi tison, > > For #3, if you mean registering remote HDFS file as local resource, we > should make the "-yt/--yarnship" > to support remote directory. I think it is the right direction. > > For #1, if the users could ship remote directory, then they could also > specify like this > "-yt hdfs://hdpdev/flink/release/flink-1.x, > hdfs://hdpdev/user/someone/mylib". Do you mean we add an > option for whether trying to avoid unnecessary uploading? Maybe we could > filter by names and file size. > I think this is a good suggestion, and we do not need to introduce a new > config option "-ypl". > > For #2, for flink-dist, the #1 could already solve the problem. We do not > need to support remote schema. > It will confuse the users when we only support HDFS, not S3, OSS, etc. > > > Best, > Yang > > tison 于2020年4月17日周五 下午8:05写道: > >> Hi Yang, >> >> I agree that these two of works would benefit from single assignee. My >> concern is as below >> >> 1. Both share libs & remote flink dist/libs are remote ship files. I >> don't think we have to implement multiple codepath/configuration. >> 2. So, for concept clarification, there are >> (1) an option to disable shipping local libs >> (2) flink-dist supports multiple schema at least we said "hdfs://" >> (3) an option for registering remote shipfiles with path & visibility. >> I think new configuration system helps. >> >> the reason we have to special handling (2) instead of including it in (3) >> is because when shipping flink-dist to TM container, we specially >> detect flink-dist. Of course we can merge it into general ship files and >> validate shipfiles finally contain flink-dist, which is an alternative. >> >> The *most important* difference is (1) and (3) which we don't have an >> option for only remote libs. Is this clarification satisfy your proposal? >> >> Best, >> tison. >> >> >> Till Rohrmann 于2020年4月17日周五 下午7:49写道: >> >>> Hi Yang, >>> >>> from what I understand it sounds reasonable to me. Could you sync with >>> Tison on FLINK-14964 on how to proceed. I'm not super deep into these >>> issues but they seem to be somewhat related and Tison already did some >>> implementation work. >>> >>> I'd say it be awesome if we could include this kind of improvement into >>> the release. >>> >>> Cheers, >>> Till >>> >>> On Thu, Apr 16, 2020 at 4:43 AM Yang Wang wrote: >>> >>>> Hi All, thanks a lot for reviving this discussion. >>>> >>>> I think we could unify the FLINK-13938 and FLINK-14964 since they have >>>> the similar >>>> purpose, avoid unnecessary uploading and downloading jars in YARN >>>> deployment. >>>> The difference is FLINK-13938 aims to support the flink system lib >>>> directory only, while >>>> FLINK-14964 is trying to support arbitrary pre-uloaded jars(including >>>> user and system jars). >>>> >>>> >>>> So i suggest to do this feature as following. >>>> 1. Upload the flink lib directory or users to hdfs, e.g. >>>> "hdfs://hdpdev/flink/release/flink-1.x" >>>> "hdfs://hdpdev/user/someone/mylib" >>>> 2. Use the -ypl argument to specify the shared lib, multiple >>>> directories could be specified >>>> 3. YarnClusterDescriptor will use the pre-uploaded jars to avoid >>>> unnecessary uploading, >>>&
Re: Job manager URI rpc address:port
You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port" options before run the program or take a look at RemoteStreamEnvironment which enables configuring host and port. Best, tison. Som Lima 于2020年4月19日周日 下午5:58写道: > Hi, > > After running > > $ ./bin/start-cluster.sh > > The following line of code defaults jobmanager to localhost:6123 > > final ExecutionEnvironment env = Environment.getExecutionEnvironment(); > > which is same on spark. > > val spark = > SparkSession.builder.master(local[*]).appname("anapp").getOrCreate > > However if I wish to run the servers on a different physical computer. > Then in Spark I can do it this way using the spark URI in my IDE. > > Conf = > SparkConf().setMaster("spark://:").setAppName("anapp") > > Can you please tell me the equivalent change to make so I can run my > servers and my IDE from different physical computers. > > > > > > > > > > > > >
Re: Flink 1.10.0 stop command
'flink cancel' broken because of https://issues.apache.org/jira/browse/FLINK-16626 Best, tison. Yun Tang 于2020年4月23日周四 上午1:18写道: > Hi > > I think you could still use ./bin/flink cancel to cancel the job. > What is the exception thrown? > > Best > Yun Tang > -- > *From:* seeksst > *Sent:* Wednesday, April 22, 2020 18:17 > *To:* user > *Subject:* Flink 1.10.0 stop command > > > Hi, > > >When i test 1.10.0, i found i must to set savepoint path otherwise i > can’t stop the job. I confuse about this, beacuse as i know, savepoint > offen large than checkpoint, so i usually resume job from checkpoint. > Another problem is sometimes job throw exception and i can’t trigger a > savepoint, so i cancel the job and change logical, resume it from last > checkpoint. although sometimes will failed, i think this will be a better > way, because i can choose cancel with a savepoint or not, so i can decede > how to resume. but in 1.10.0, i must to set it, and seems system will > trigger savepoint, i think this will take more risk, and it will delete > checkpoint even i set retain on cancellation. so i have no checkpoint left. > If i use cancel , it will break with exception. > > So how to work with 1.10.0 ? any advice will be helpful. > > Thanks. > > >
Re: Flink 1.10.0 stop command
To be precise, the cancel command would succeed on cluster side but the response *might* lost so that client throws with TimeoutException. If it is the case, this is the root which will be fixed in 1.10.1. Best, tison. tison 于2020年4月23日周四 上午1:20写道: > 'flink cancel' broken because of > https://issues.apache.org/jira/browse/FLINK-16626 > > Best, > tison. > > > Yun Tang 于2020年4月23日周四 上午1:18写道: > >> Hi >> >> I think you could still use ./bin/flink cancel to cancel the job. >> What is the exception thrown? >> >> Best >> Yun Tang >> -- >> *From:* seeksst >> *Sent:* Wednesday, April 22, 2020 18:17 >> *To:* user >> *Subject:* Flink 1.10.0 stop command >> >> >> Hi, >> >> >>When i test 1.10.0, i found i must to set savepoint path otherwise i >> can’t stop the job. I confuse about this, beacuse as i know, savepoint >> offen large than checkpoint, so i usually resume job from checkpoint. >> Another problem is sometimes job throw exception and i can’t trigger a >> savepoint, so i cancel the job and change logical, resume it from last >> checkpoint. although sometimes will failed, i think this will be a better >> way, because i can choose cancel with a savepoint or not, so i can decede >> how to resume. but in 1.10.0, i must to set it, and seems system will >> trigger savepoint, i think this will take more risk, and it will delete >> checkpoint even i set retain on cancellation. so i have no checkpoint left. >> If i use cancel , it will break with exception. >> >> So how to work with 1.10.0 ? any advice will be helpful. >> >> Thanks. >> >> >> >
Re: java.lang.NoSuchMethodError while writing to Kafka from Flink
Could you try to download binary dist from flink download page and re-execute the job? It seems like something wrong with flink-dist.jar. BTW, please post user question on only user mailing list(not dev). Best, tison. Guowei Ma 于2020年5月25日周一 上午10:49写道: > Hi > 1. You could check whether the 'org.apache.flink.api.java.clean' is in > your classpath first. > 2. Do you follow the doc[1] to deploy your local cluster and run some > existed examples such as WordCount? > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/cluster_setup.html > Best, > Guowei >
[DISCUSS] Semantic and implementation of per-job mode
(CC user list because I think users may have ideas on how per-job mode should look like) Hi all, In the discussion about Flink on k8s[1] we encounter a problem that opinions diverge in how so-called per-job mode works. This thread is aimed at stating a dedicated discussion about per-job semantic and how to implement it. **The AS IS per-job mode** * in standalone deployment, we bundle user jar with Flink jar, retrieve JobGraph which is the very first JobGraph from user program in classpath, and then start a Dispatcher with this JobGraph preconfigured, which launches it as "recovered" job. * in YARN deployment, we accept submission via CliFrontend, extract JobGraph which is the very first JobGraph from user program submitted, serialize the JobGraph and upload it to YARN as resource, and then when AM starts, retrieve the JobGraph as resource and start Dispatcher with this JobGraph preconfigured, follows are the same. Specifically, in order to support multiple parts job, if YARN deployment configured as "attached", it starts a SessionCluster, proceeds the progress and shutdown the cluster on job finished. **Motivation** The implementation mentioned above, however, suffers from problems. The major two of them are 1. only respect the very first JobGraph from user program 2. compile job in client side 1. Only respect the very first JobGraph from user program There is already issue about this topic[2]. As we extract JobGraph from user program by hijacking Environment#execute we actually abort any execution after the first call to #execute. Besides it surprises users many times that any logic they write in the program is possibly never executed, here the problem is that the semantic of "job" from Flink perspective. I'd like to say in current implementation "per-job" is actually "per-job-graph". However, in practices since we support jar submission it is "per-program" semantic wanted. 2. Compile job in client side Well, standalone deployment is not in the case. But in YARN deployment, we compile job and get JobGraph in client side, and then upload it to YARN. This approach, however, somehow breaks isolation. We have observed that user program contains exception handling logic which call System.exit in main method, which causes a compilation of the job exit the whole client at once. It is a critical problem if we manage multiple Flink job in a unique platform. In this case, it shut down the whole service. Besides there are many times I was asked why per-job mode doesn't run "just like" session mode but with a dedicated cluster. It might imply that current implementation mismatches users' demand. **Proposal** In order to provide a "per-program" semantic mode which acts "just like" session mode but with a dedicated cluster, I propose a workflow as below. It acts like starting a drive on cluster but is not a general driver solution as proposed here[3], the main purpose of the workflow below is for providing a "per-program" semantic mode. *From CliFrontend* 1. CliFrontend receives submission, gathers all configuration and starts a corresponding ClusterDescriptor. 2. ClusterDescriptor deploys a cluster with main class ProgramClusterEntrypoint while shipping resources including user program. 3. ProgramClusterEntrypoint#main contains logic starting components including Standalone Dispatcher, configuring user program to start a RpcClusterClient, and then invoking main method of user program. 4. RpcClusterClient acts like MiniClusterClient which is able to submit the JobGraph after leader elected so that we don't fallback to round-robin or fail submission due to no leader. 5. Whether or not deliver job result depends on user program logic, since we can already get a JobClient from execute. ProgramClusterEntrypoint exits on user program exits and all jobs submitted globally terminate. This way fits in the direction of FLIP-73 because strategy starting a RpcClusterClient can be regarded as a special Executor. After ProgramClusterEntrypoint#main starts a Cluster, it generates and passes configuration to user program so that when Executor generated, it knows to use a RpcClusterClient for submission and the address of Dispatcher. **Compatibility** In my opinion this mode can be totally an add-on to current codebase. We actually don't replace current per-job mode with so-called "per-program" mode. It happens that current per-job mode would be useless if we have such "per-program" mode so that we possibly deprecate it for preferring the other. I'm glad to discuss more into details if you're interested in, but let's say we'd better first reach a consensus on the overall design :-) Looking forward to your reply! Best, tison. [1] https://issues.apache.org/jira/browse/FLINK-9953 [2] https://issues.apache.org/jira/browse/FLINK-10879 [3] https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit#
Re: [DISCUSS] Semantic and implementation of per-job mode
Thanks for your attentions! @shixiaoga...@gmail.com Yes correct. We try to avoid jobs affect one another. Also a local ClusterClient in case saves the overhead about retry before leader elected and persist JobGraph before submission in RestClusterClient as well as the net cost. @Paul Lam 1. Here is already a note[1] about multiple part jobs. I am also confused a bit on this concept at first :-) Things go in similar way if you program contains the only JobGraph so that I think per-program acts like per-job-graph in this case which provides compatibility for many of one job graph program. Besides, we have to respect user program which doesn't with current implementation because we return abruptly when calling env#execute which hijack user control so that they cannot deal with the job result or the future of it. I think this is why we have to add a detach/attach option. 2. For compilation part, I think it could be a workaround that you upload those resources in a commonly known address such as HDFS so that compilation can read from either client or cluster. Best, tison. [1] https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 Newport, Billy 于2019年10月30日周三 下午10:41写道: > We execute multiple job graphs routinely because we cannot submit a single > graph without it blowing up. I believe Regina spoke to this in Berlin > during her talk. We instead if we are processing a database ingestion with > 200 tables in it, we do a job graph per table rather than a single job > graph that does all tables instead. A single job graph can be in the tens > of thousands of nodes in our largest cases and we have found flink (as os > 1.3/1.6.4) cannot handle graphs of that size. We’re currently testing 1.9.1 > but have not retested the large graph scenario. > > > > Billy > > > > > > *From:* Paul Lam [mailto:paullin3...@gmail.com] > *Sent:* Wednesday, October 30, 2019 8:41 AM > *To:* SHI Xiaogang > *Cc:* tison; dev; user > *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode > > > > Hi, > > > > Thanks for starting the discussion. > > > > WRT the per-job semantic, it looks natural to me that per-job means > per-job-graph, > > because in my understanding JobGraph is the representation of a job. Could > you > > share some use case in which a user program should contain multiple job > graphs? > > > > WRT the per-program mode, I’m also in flavor of a unified cluster-side > execution > > for user program, so +1 from my side. > > > > But I think there may be some values for the current per-job mode: we now > have > > some common resources available on the client machine that would be read > by main > > methods in user programs. If migrated to per-program mode, we must > explicitly > > set the specific resources for each user program and ship them to the > cluster, > > it would be a bit inconvenient. Also, as the job graph is compiled at the > client, > > we can recognize the errors caused by user code before starting the > cluster > > and easily get access to the logs. > > > > Best, > > Paul Lam > > > > 在 2019年10月30日,16:22,SHI Xiaogang 写道: > > > > Hi > > > > Thanks for bringing this. > > > > The design looks very nice to me in that > > 1. In the new per-job mode, we don't need to compile user programs in the > client and can directly run user programs with user jars. That way, it's > easier for resource isolation in multi-tenant platforms and is much safer. > > 2. The execution of user programs can be unified in session and per-job > modes. In session mode, user jobs are submitted via a remote ClusterClient > while in per-job mode user jobs are submitted via a local ClusterClient. > > > > Regards, > > Xiaogang > > > > tison 于2019年10月30日周三 下午3:30写道: > > (CC user list because I think users may have ideas on how per-job mode > should look like) > > > > Hi all, > > In the discussion about Flink on k8s[1] we encounter a problem that > opinions > diverge in how so-called per-job mode works. This thread is aimed at > stating > a dedicated discussion about per-job semantic and how to implement it. > > **The AS IS per-job mode** > > * in standalone deployment, we bundle user jar with Flink jar, retrieve > JobGraph which is the very first JobGraph from user program in classpath, > and then start a Dispatcher with this JobGraph preconfigured, which > launches it as "recovered" job. > > * in YARN deployment, we accept submission via CliFrontend, extract > JobGraph > which is the very f
Re: [DISCUSS] Semantic and implementation of per-job mode
Hi all, Thanks for your participation! First of all I have to clarify two confusion in this thread. 1. The proposed "pre-program" mode is definitely a new mode opt-in. It is described in "Compatibility" section of the original email. 2. The documentation linked in the original email "Flink driver" is NOT the proposed design. See also the original paragraph below I'm sorry for link that in the first email which causes further confusion. >In order to provide a "per-program" semantic mode which acts "just like" session >mode but with a dedicated cluster, I propose a workflow as below. It acts like >starting a drive on cluster but is NOT a general driver solution as proposed >here[3], the main purpose of the workflow below is for providing a "per-program" >semantic mode. I'm reading detailedly your ideas and writing reply now :-) Best, tison. Peter Huang 于2019年11月1日周五 下午12:47写道: > Hi Tison and Community, > > Thanks for bringing it up. Actually, we meet a similar bottleneck of using > per cluster mode. Our team built a service for deploying and operating > Flink jobs. > The service sits in front of yarn clusters. To submit different job jars, > we need to download client jar into the service and generate a job > graph which is time-consuming. > Thus, we find an idea of Delayed Job Graph to make the job graph > generation in ClusterEntryPoint rather than on the client-side. Compare to > your proposal, it is more lightweight, > and it is an option for existing per job mode. But it is not a solution > for handling multiple job graph within a program. > > I am looking forward to more comments on the proposal, and also definitely > cooperation on this effort. > I hope both of our pain points can be resolved and contribute back to the > community. > > > > https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit?ts=5da1f4d7#heading=h.be92q3uiam4t > > > Best Regards > Peter Huang > > > > > > > > > > > > > > > > > > On Thu, Oct 31, 2019 at 8:17 PM bupt_ljy wrote: > >> Hi all, >> >> >> Firstly thanks @tison for bring this up and strongly +1 for the overall >> design. >> >> >> I’d like to add one more example of "multiple jobs in one program" with >> what I’m currently working on. I’m trying to run a TPC-DS benchmark testing >> (including tens of sql query job) on Flink and sufferring a lot from >> maintaining the client because I can’t run this program in per-job mode and >> have to make the client attached. >> >> >> Back to our discussion, I can see now there is a divergence of compiling >> the job graph between in client and in #ClusterEntrypoint. And up and >> downsides exist in either way. As for the opt-in solution, I have a >> question, what if the user chooses detach mode, compiling in the client and >> runs a multi-job program at the same time? And it still not gonna work. >> >> Besides, by adding an compiling option, we need to consider more things >> when submitting a job like "Is my program including multiple job?" or "Does >> the program need to be initialized before submitting to a remote cluster?", >> which looks a bit complicated and confusing to me. >> >> >> By summarizing, I'll vote for the per-program new concept but I may not >> prefer the opt-in option mentioned in the mailing list or maybe we need to >> reconsider a better concept and definition which is easy to understand. >> >> >> >> Best, >> >> Jiayi Liao >> >> Original Message >> *Sender:* Rong Rong >> *Recipient:* Regina" >> *Cc:* Theo Diefenthal; >> user@flink.apache.org >> *Date:* Friday, Nov 1, 2019 11:01 >> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >> >> Hi All, >> >> Thanks @Tison for starting the discussion and I think we have very >> similar scenario with Theo's use cases. >> In our case we also generates the job graph using a client service (which >> serves multiple job graph generation from multiple user code) and we've >> found that managing the upload/download between the cluster and the DFS to >> be trick and error-prone. In addition, the management of different >> environment and requirement from different user in a single service posts >> even more trouble for us. >> >> However, shifting the job graph generation towards the cluster side also >> requires some thoughts regarding how to manage the driver-job as well as >> some dependencies conflicts - In the
Re: [DISCUSS] Semantic and implementation of per-job mode
Hi Peter, I checked out you proposal FLIP-85 and think that we are in the very similar direction. For any reason in your proposal we can create PackagedProgram in server side and thus if we can configure environment properly we can directly invoke main method. In addition to your design document, in fact PackagedProgram is unnecessary to be a class in flink-client. With related Exceptions move to flink-runtime it can be a flink-runtime concept now. And thus we don't suffer from dependency conflict actually. Best, tison. tison 于2019年11月1日周五 下午2:17写道: > Hi all, > > Thanks for your participation! First of all I have to clarify two > confusion in this thread. > > 1. The proposed "pre-program" mode is definitely a new mode opt-in. It is > described in > "Compatibility" section of the original email. > > 2. The documentation linked in the original email "Flink driver" is NOT > the proposed design. > See also the original paragraph below I'm sorry for link that in the first > email which causes > further confusion. > > >In order to provide a "per-program" semantic mode which acts "just like" > session > >mode but with a dedicated cluster, I propose a workflow as below. It acts > like > >starting a drive on cluster but is NOT a general driver solution as > proposed > >here[3], the main purpose of the workflow below is for providing a > "per-program" > >semantic mode. > > I'm reading detailedly your ideas and writing reply now :-) > > Best, > tison. > > > Peter Huang 于2019年11月1日周五 下午12:47写道: > >> Hi Tison and Community, >> >> Thanks for bringing it up. Actually, we meet a similar bottleneck of >> using per cluster mode. Our team built a service for deploying and >> operating Flink jobs. >> The service sits in front of yarn clusters. To submit different job jars, >> we need to download client jar into the service and generate a job >> graph which is time-consuming. >> Thus, we find an idea of Delayed Job Graph to make the job graph >> generation in ClusterEntryPoint rather than on the client-side. Compare to >> your proposal, it is more lightweight, >> and it is an option for existing per job mode. But it is not a solution >> for handling multiple job graph within a program. >> >> I am looking forward to more comments on the proposal, and also >> definitely cooperation on this effort. >> I hope both of our pain points can be resolved and contribute back to the >> community. >> >> >> >> https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit?ts=5da1f4d7#heading=h.be92q3uiam4t >> >> >> Best Regards >> Peter Huang >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Thu, Oct 31, 2019 at 8:17 PM bupt_ljy wrote: >> >>> Hi all, >>> >>> >>> Firstly thanks @tison for bring this up and strongly +1 for the overall >>> design. >>> >>> >>> I’d like to add one more example of "multiple jobs in one program" with >>> what I’m currently working on. I’m trying to run a TPC-DS benchmark testing >>> (including tens of sql query job) on Flink and sufferring a lot from >>> maintaining the client because I can’t run this program in per-job mode and >>> have to make the client attached. >>> >>> >>> Back to our discussion, I can see now there is a divergence of compiling >>> the job graph between in client and in #ClusterEntrypoint. And up and >>> downsides exist in either way. As for the opt-in solution, I have a >>> question, what if the user chooses detach mode, compiling in the client and >>> runs a multi-job program at the same time? And it still not gonna work. >>> >>> Besides, by adding an compiling option, we need to consider more things >>> when submitting a job like "Is my program including multiple job?" or "Does >>> the program need to be initialized before submitting to a remote cluster?", >>> which looks a bit complicated and confusing to me. >>> >>> >>> By summarizing, I'll vote for the per-program new concept but I may not >>> prefer the opt-in option mentioned in the mailing list or maybe we need to >>> reconsider a better concept and definition which is easy to understand. >>> >>> >>> >>> Best, >>> >>> Jiayi Liao >>> >>> Original Message >>> *Sender:* Rong
Re: Flink (Local) Environment Thread Leaks?
We found this issue previous. In our case where leak thread comes from is tracked as https://issues.apache.org/jira/browse/FLINK-14565 Best, tison. vino yang 于2019年11月14日周四 上午10:15写道: > Hi Theo, > > If you think there is a thread leakage problem. You can create a JIRA > issue and write a detailed description. > > Ping @Gary Yao and @Zhu Zhu to > help to locate and analyze this problem? > > Best, > Vino > > Theo Diefenthal 于2019年11月14日周四 > 上午3:16写道: > >> I included a Solr End2End test in my project, inheriting from Junit 4 >> SolrCloudTestCase. >> >> The solr-test-framework for junit 4 makes use of >> com.carrotsearch.randomizedtesting >> which automatically tests for thread leakages on test end. In my other >> projects, that tool doesn't produce any problems. >> When used in a test together with a Flink LocalExecutionEnvironment, it >> will prevent the test from suceeding due the following error at shutdown >> phase: >> >> com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked from >> SUITE scope at somepackage.E2ETest: >>1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1, >> state=TIMED_WAITING, group=TGRP-E2ETest] >> at sun.misc.Unsafe.park(Native Method) >> at >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) >> at >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >> at >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >>2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING, >> group=TGRP-E2ETest] >> at sun.misc.Unsafe.park(Native Method) >> at >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >> at >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >> at >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >>3) Thread[id=28, name=metrics-meter-tick-thread-1, >> state=TIMED_WAITING, group=TGRP-E2ETest] >> at sun.misc.Unsafe.park(Native Method) >> at >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) >> at >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >> at >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> >> at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0) >> >> Note that I can suppress the errors easily via setting >> @ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want >> to point out possible thread leaks in the mailing list here. As the >> first thread is named FlinkCompletableFutureDelayScheduler, I suggest that >> Flink doesn't shut down some of its multitude of threads nicely in a local >> execution environment. My question: Is that some kind of problem / thread >> leakage in Flink or is it just a false warning? >> >> >> >>
Re: Flink (Local) Environment Thread Leaks?
It is because MiniCluster start a SystemResourcesCounter for gathering metrics but no logic for shutdown. Thus on cluster exist the thread leak. Best, tison. tison 于2019年11月14日周四 上午10:21写道: > We found this issue previous. > > In our case where leak thread comes from is tracked as > https://issues.apache.org/jira/browse/FLINK-14565 > > Best, > tison. > > > vino yang 于2019年11月14日周四 上午10:15写道: > >> Hi Theo, >> >> If you think there is a thread leakage problem. You can create a JIRA >> issue and write a detailed description. >> >> Ping @Gary Yao and @Zhu Zhu to >> help to locate and analyze this problem? >> >> Best, >> Vino >> >> Theo Diefenthal 于2019年11月14日周四 >> 上午3:16写道: >> >>> I included a Solr End2End test in my project, inheriting from Junit 4 >>> SolrCloudTestCase. >>> >>> The solr-test-framework for junit 4 makes use of >>> com.carrotsearch.randomizedtesting >>> which automatically tests for thread leakages on test end. In my other >>> projects, that tool doesn't produce any problems. >>> When used in a test together with a Flink LocalExecutionEnvironment, it >>> will prevent the test from suceeding due the following error at shutdown >>> phase: >>> >>> com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked >>> from SUITE scope at somepackage.E2ETest: >>>1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1, >>> state=TIMED_WAITING, group=TGRP-E2ETest] >>> at sun.misc.Unsafe.park(Native Method) >>> at >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) >>> at >>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >>> at >>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>>2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING, >>> group=TGRP-E2ETest] >>> at sun.misc.Unsafe.park(Native Method) >>> at >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >>> at >>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >>> at >>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>>3) Thread[id=28, name=metrics-meter-tick-thread-1, >>> state=TIMED_WAITING, group=TGRP-E2ETest] >>> at sun.misc.Unsafe.park(Native Method) >>> at >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) >>> at >>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) >>> at >>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0) >>> >>> Note that I can suppress the errors easily via setting >>> @ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want >>> to point out possible thread leaks in the mailing list here. As the >>> first thread is named FlinkCompletableFutureDelayScheduler, I suggest that >>> Flink doesn't shut down some of its multitude of threads nicely in a local >>> execution environment. My question: Is that some kind of problem / thread >>> leakage in Flink or is it just a false warning? >>> >>> >>> >>>
[DISCUSS] Support configure remote flink jar
Hi forks, Recently, our customers ask for a feature configuring remote flink jar. I'd like to reach to you guys to see whether or not it is a general need. ATM Flink only supports configures local file as flink jar via `-yj` option. If we pass a HDFS file path, due to implementation detail it will fail with IllegalArgumentException. In the story we support configure remote flink jar, this limitation is eliminated. We also make use of YARN locality so that reducing uploading overhead, instead, asking YARN to localize the jar on AM container started. Besides, it possibly has overlap with FLINK-13938. I'd like to put the discussion on our mailing list first. Are you looking forward to such a feature? @Yang Wang: this feature is different from that we discussed offline, it only focuses on flink jar, not all ship files. Best, tison.
Re: [DISCUSS] Support configure remote flink jar
Thanks for your participation! @Yang: Great to hear. I'd like to know whether or not a remote flink jar path conflicts with FLINK-13938. IIRC FLINK-13938 auto excludes local flink jar from shipping which possibly not works for the remote one. @Thomas: It inspires a lot URL becomes the unified representation of resource. I'm thinking of how to serve a unique process getting resource from URL which points to an artifact or distributed file system. @ouywl & Stephan: Yes this improvement can be migrated to environment like k8s, IIRC the k8s proposal already discussed about improvement using "init container" and other technologies. However, so far I regard it is an improvement different from one storage to another so that we achieve then individually. Best, tison. Stephan Ewen 于2019年11月20日周三 上午12:34写道: > Would that be a feature specific to Yarn? (and maybe standalone sessions) > > For containerized setups, and init container seems like a nice way to > solve this. Also more flexible, when it comes to supporting authentication > mechanisms for the target storage system, etc. > > On Tue, Nov 19, 2019 at 5:29 PM ouywl wrote: > >> I have implemented this feature in our env, Use ‘Init Container’ of >> docker to get URL of a jar file ,It seems a good idea. >> >> ouywl >> ou...@139.com >> >> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D> >> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 >> >> On 11/19/2019 12:11,Thomas Weise wrote: >> >> There is a related use case (not specific to HDFS) that I came across: >> >> It would be nice if the jar upload endpoint could accept the URL of a jar >> file as alternative to the jar file itself. Such URL could point to an >> artifactory or distributed file system. >> >> Thomas >> >> >> On Mon, Nov 18, 2019 at 7:40 PM Yang Wang wrote: >> >>> Hi tison, >>> >>> Thanks for your starting this discussion. >>> * For user customized flink-dist jar, it is an useful feature. Since it >>> could avoid to upload the flink-dist jar >>> every time. Especially in production environment, it could accelerate the >>> submission process. >>> * For the standard flink-dist jar, FLINK-13938[1] could solve >>> the problem.Upload a official flink release >>> binary to distributed storage(hdfs) first, and then all the submission >>> could benefit from it. Users could >>> also upload the customized flink-dist jar to accelerate their submission. >>> >>> If the flink-dist jar could be specified to a remote path, maybe the user >>> jar have the same situation. >>> >>> [1]. https://issues.apache.org/jira/browse/FLINK-13938 >>> >>> tison 于2019年11月19日周二 上午11:17写道: >>> >>> > Hi forks, >>> > >>> > Recently, our customers ask for a feature configuring remote flink jar. >>> > I'd like to reach to you guys >>> > to see whether or not it is a general need. >>> > >>> > ATM Flink only supports configures local file as flink jar via `-yj` >>> > option. If we pass a HDFS file >>> > path, due to implementation detail it will fail with >>> > IllegalArgumentException. In the story we support >>> > configure remote flink jar, this limitation is eliminated. We also make >>> > use of YARN locality so that >>> > reducing uploading overhead, instead, asking YARN to localize the jar >>> on >>> > AM container started. >>> > >>> > Besides, it possibly has overlap with FLINK-13938. I'd like to put the >>> > discussion on our >>> > mailing list first. >>> > >>> > Are you looking forward to such a feature? >>> > >>> > @Yang Wang: this feature is different from that we discussed offline, >>> it >>> > only focuses on flink jar, not >>> > all ship files. >>> > >>> > Best, >>> > tison. >>> > >>> >>
Is it possible to recover from a checkpoint after modify program?
Hi, Here is our case: a job, reading data from Kafka, doing some process and writing to HDFS, has been running quite a while and has completed checkpoints. Now we'd like to add a new phase in process and want to recover from a checkpoint taken before. The new phase may or may not be stateful. Is it possible we do the recovery in respective case? Best, tison.
Re: Change Akka Ask Timeout for Job Submission Only
In previous version there is an "akka.client.timeout" option but it is only used for timeout the future in client side so I don't think it change akka scope timeout. Best, tison. Abdul Qadeer 于2019年12月20日周五 上午10:44写道: > Hi! > > I am using Flink 1.8.3 and facing an issue where job submission through > RestClusterClient times out on Akka (default value 10s). In previous Flink > versions there was an option to set a different timeout value just for the > submission client (ClusterClient config), but looks like it is not honored > now as job submission from client is no more through Akka and it will use > the same value present with Dispatcher. I wanted to know how to increase > this timeout just for job submission without affecting other akka threads > in TaskManager/JobManager, or any other solution for the problem. > > The relevant stack trace is pasted below: > > "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job > (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job > (JobID: > 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed > to submit JobGraph.","message":"Failed to submit > JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal > server error., side:\nakka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat > akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat > java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server > side>]","message":"[Internal server error., side:\nakka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat > akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat > java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server > side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"} >
Re: Change Akka Ask Timeout for Job Submission Only
IIRC this issue is possibly caused by resource limited or some occasional reasons. Ever I heard that someone upgrade Java version and the issue vanished. For "akka.ask.timeout", it is used for all akka ask requests timeout. And I second Yang that the timeout is irrelevant with client-server connection. Best, tison. Yang Wang 于2019年12月20日周五 上午11:02写道: > It seems that not because the timeout of rest client. It is a server side > akka timeout exception. > Could you share the jobmanager logs? > > Best, > Yang > > Abdul Qadeer 于2019年12月20日周五 上午10:59写道: > >> The relevant config here is "akka.ask.timeout". >> >> On Thu, Dec 19, 2019 at 6:51 PM tison wrote: >> >>> In previous version there is an "akka.client.timeout" option but it is >>> only used for timeout the future in client side so I don't think it change >>> akka scope timeout. >>> >>> Best, >>> tison. >>> >>> >>> Abdul Qadeer 于2019年12月20日周五 上午10:44写道: >>> >>>> Hi! >>>> >>>> I am using Flink 1.8.3 and facing an issue where job submission through >>>> RestClusterClient times out on Akka (default value 10s). In previous Flink >>>> versions there was an option to set a different timeout value just for the >>>> submission client (ClusterClient config), but looks like it is not honored >>>> now as job submission from client is no more through Akka and it will use >>>> the same value present with Dispatcher. I wanted to know how to increase >>>> this timeout just for job submission without affecting other akka threads >>>> in TaskManager/JobManager, or any other solution for the problem. >>>> >>>> The relevant stack trace is pasted below: >>>> >>>> "cause":{"commonElementCount":8,"localizedMessage":"Could not submit >>>> job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit >>>> job (JobID: >>>> 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed >>>> to submit JobGraph.","message":"Failed to submit >>>> JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal >>>> server error., >>> side:\nakka.pattern.AskTimeoutException: Ask timed out on >>>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. >>>> Sender[null] sent message of type >>>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat >>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat >>>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat >>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat >>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat >>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat >>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat >>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat >>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat >>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat >>>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server >>>> side>]","message":"[Internal server error., >>> side:\nakka.pattern.AskTimeoutException: Ask timed out on >>>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. >>>> Sender[null] sent message of type >>>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat >>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat >>>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat >>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat >>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat >>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:59
Re: Change Akka Ask Timeout for Job Submission Only
Forward to user list. Best, tison. Abdul Qadeer 于2019年12月20日周五 下午12:57写道: > Around submission time, logs from jobmanager: > > {"timeMillis":1576764854245,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Received > JobGraph submission 714829e8f6c8cd0daaed335c1b8c588a > (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M > {"timeMillis":1576764854247,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Submitting > job 714829e8f6c8cd0daaed335c1b8c588a > (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M > {"timeMillis":1576764856119,"thread":"flink-akka.actor.default-dispatcher-1036","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message > [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from > Actor[akka://flink/deadLetters] to > Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [87] > dead letters encountered. This logging can be turned off or adjusted with > configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1150,"threadPriority":5}^M > {"timeMillis":1576764877732,"thread":"flink-akka.actor.default-dispatcher-1039","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message > [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from > Actor[akka://flink/deadLetters] to > Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [88] > dead letters encountered. This logging can be turned off or adjusted with > configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1155,"threadPriority":5}^M > {"timeMillis":1576764877732,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobSubmitHandler","message":"Unhandled > exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed > out on [Actor[akka://flink/user/dispatcher#1899316777]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask > timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [1 > ms]. Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file&q
Re: Submit high version compiled code jar to low version flink cluster?
It possibly fails with incompatibility. Flink doesn't promise such compatibility but it MIGHT work. Best, tison. wangl...@geekplus.com.cn 于2019年12月30日周一 下午3:17写道: > > The flink cluster version is 1.8.2 > The application source code needs some feature only supported in 1.9.1. > So it is compiled with flink-1.9.1 denendency and builed to a fat jar with > all the flink dependencies. > What it will happen if I submit the high version builed jar to the low > verion flink cluster? > > Thansk, > Lei > > > > >
Re: jobgraph 生成
如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以 JsonPlanGenerator.generatePlan(jobGraph) 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。 Best, tison. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos 张江 于2020年1月8日周三 上午11:01写道: > 大家好, > > 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink > visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做? > > flink里似乎没有直接的API可以调用,但是我在flink web ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的? > > > 谢谢 > > 张江 > 邮箱:zjkingdom2...@163.com > > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E5%BC%A0%E6%B1%9F&uid=zjkingdom2010%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Azjkingdom2010%40163.com%22%5D> > > 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail88> 定制 >
Re: jobgraph 生成
Hi Zhang, I just notice that it is sent to user list. Please send to user-zh list(in cc) next time if you want to discuss in Chinese. Best, tison. tison 于2020年1月8日周三 上午11:06写道: > 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以 > > JsonPlanGenerator.generatePlan(jobGraph) > > 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。 > > Best, > tison. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos > > > 张江 于2020年1月8日周三 上午11:01写道: > >> 大家好, >> >> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink >> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做? >> >> flink里似乎没有直接的API可以调用,但是我在flink web >> ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的? >> >> >> 谢谢 >> >> 张江 >> 邮箱:zjkingdom2...@163.com >> >> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E5%BC%A0%E6%B1%9F&uid=zjkingdom2010%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Azjkingdom2010%40163.com%22%5D> >> >> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail88> 定制 >> >
Re: jobgraph 生成
A public way to get JSON plan of a JobGraph is, with an existing Flink Cluster, use REST API JarPlan[1]. Best, tison. [1] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#jars-jarid-plan tison 于2020年1月8日周三 上午11:08写道: > Hi Zhang, > > I just notice that it is sent to user list. Please send to user-zh list(in > cc) next time if you want to discuss in Chinese. > > Best, > tison. > > > tison 于2020年1月8日周三 上午11:06写道: > >> 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以 >> >> JsonPlanGenerator.generatePlan(jobGraph) >> >> 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。 >> >> Best, >> tison. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos >> >> >> 张江 于2020年1月8日周三 上午11:01写道: >> >>> 大家好, >>> >>> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink >>> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做? >>> >>> flink里似乎没有直接的API可以调用,但是我在flink web >>> ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的? >>> >>> >>> 谢谢 >>> >>> 张江 >>> 邮箱:zjkingdom2...@163.com >>> >>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E5%BC%A0%E6%B1%9F&uid=zjkingdom2010%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Azjkingdom2010%40163.com%22%5D> >>> >>> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail88> 定制 >>> >>
Re: [ANNOUNCE] Dian Fu becomes a Flink committer
Congratulations! Dian Best, tison. Zhu Zhu 于2020年1月17日周五 上午10:47写道: > Congratulations Dian. > > Thanks, > Zhu Zhu > > hailongwang <18868816...@163.com> 于2020年1月17日周五 上午10:01写道: > >> >> Congratulations Dian ! >> >> Best, >> Hailong Wang >> >> >> >> >> 在 2020-01-16 21:15:34,"Congxian Qiu" 写道: >> >> Congratulations Dian Fu >> >> Best, >> Congxian >> >> >> Jark Wu 于2020年1月16日周四 下午7:44写道: >> >>> Congratulations Dian and welcome on board! >>> >>> Best, >>> Jark >>> >>> On Thu, 16 Jan 2020 at 19:32, Jingsong Li >>> wrote: >>> >>> > Congratulations Dian Fu. Well deserved! >>> > >>> > Best, >>> > Jingsong Lee >>> > >>> > On Thu, Jan 16, 2020 at 6:26 PM jincheng sun >> > >>> > wrote: >>> > >>> >> Congrats Dian Fu and welcome on board! >>> >> >>> >> Best, >>> >> Jincheng >>> >> >>> >> Shuo Cheng 于2020年1月16日周四 下午6:22写道: >>> >> >>> >>> Congratulations! Dian Fu >>> >>> >>> >>> > Xingbo Wei Zhong 于2020年1月16日周四 下午6:13写道: >> jincheng sun >>> >>> 于2020年1月16日周四 下午5:58写道: >>> >>> >>> >> >>> > >>> > -- >>> > Best, Jingsong Lee >>> > >>> >> >> >> >> >> >
Re: Replacing a server in Zookeeper Quorum
I second Yang that it would be a workaround that you set a static ip for EMR master node. Even in ZooKeeper world reconfig is a new and immature feature since 3.5.3 while Flink uses ZooKeeper 3.4.x. It would be a breaking change if we "just" upgrade zk version but hopefully the Flink community keep digging out a safe upgrade path. Best, tison. Yang Wang 于2020年1月22日周三 上午10:34写道: > Hi Aaron, > > I think it is not the responsibility of Flink. Flink uses zookeeper > curator to connect the zk server. If > multiple zk server are specified, it has an automatic retry mechanism. > However, your problem is ip address will change when the EMR instance > restarts. Currently, Flink > can not support dynamically loading configuration. One quick solution is > to use a static ip for EMR > master node[1]. > > > Best, > Yang > > > [1]. > https://aws.amazon.com/premiumsupport/knowledge-center/static-private-ip-master-node-emr/?nc1=h_ls > > Aaron Langford 于2020年1月22日周三 上午1:48写道: > >> Hello Flink Community, >> >> I'm working on a HA setup of Flink 1.8.1 on AWS EMR and have some >> questions about how Flink interacts with Zookeeper when one of the servers >> in the quorum specified in flink-conf.yaml goes down and is replaced by a >> machine with a new IP address. >> >> Currently, I configure high-availability.zookeeper.quorum to be the IP >> addresses of the 3 master nodes of the EMR cluster, as this is what AWS >> does to enable a highly available YARN setup. >> >> EMR master nodes may go down entirely and need to be replaced by a >> machine with a different instance IP address. I will almost certainly need >> to perform a rolling configuration update to account for this. But will I >> need to restart flink for this to take effect? Is there a way to >> dynamically reload these configs when they change? >> >> Aaron >> >
Re: Replacing a server in Zookeeper Quorum
Good to know :-) Best, tison. Aaron Langford 于2020年1月22日周三 上午10:44写道: > My apologies, I ended up resolving this through experimentation. AWS > replaces master nodes with the same internal DNS names, so configurations > need not be changed. > > Aaron > > > On Tue, Jan 21, 2020, 6:33 PM Yang Wang wrote: > >> Hi Aaron, >> >> I think it is not the responsibility of Flink. Flink uses zookeeper >> curator to connect the zk server. If >> multiple zk server are specified, it has an automatic retry mechanism. >> However, your problem is ip address will change when the EMR instance >> restarts. Currently, Flink >> can not support dynamically loading configuration. One quick solution is >> to use a static ip for EMR >> master node[1]. >> >> >> Best, >> Yang >> >> >> [1]. >> https://aws.amazon.com/premiumsupport/knowledge-center/static-private-ip-master-node-emr/?nc1=h_ls >> >> Aaron Langford 于2020年1月22日周三 上午1:48写道: >> >>> Hello Flink Community, >>> >>> I'm working on a HA setup of Flink 1.8.1 on AWS EMR and have some >>> questions about how Flink interacts with Zookeeper when one of the servers >>> in the quorum specified in flink-conf.yaml goes down and is replaced by a >>> machine with a new IP address. >>> >>> Currently, I configure high-availability.zookeeper.quorum to be the IP >>> addresses of the 3 master nodes of the EMR cluster, as this is what AWS >>> does to enable a highly available YARN setup. >>> >>> EMR master nodes may go down entirely and need to be replaced by a >>> machine with a different instance IP address. I will almost certainly need >>> to perform a rolling configuration update to account for this. But will I >>> need to restart flink for this to take effect? Is there a way to >>> dynamically reload these configs when they change? >>> >>> Aaron >>> >>
Re: Flink Job Submission Fails even though job is running
It is a known issue reported multiple times that if you are in an early jdk 1.8.x version, upgrade the bugfix version and the issue will vanish. I don't ever have a log on jm side when this issue reported so I'm sorry unable to explain more... Best, tison. Yang Wang 于2020年1月22日周三 上午10:46写道: > The "web.timeout" will be used for all web monitor asynchronous > operations, including the > "DispatcherGateway.submitJob" in the "JobSubmitHandler". > So when you increase the timeout, does it still could not work? > > Best, > Yang > > satya brat 于2020年1月21日周二 下午8:57写道: > >> How does web.timeout help hear?? The issue is with respect to aka >> dispatched timing out. The job is submitted to the task managers but the >> response doesn't reach the client. >> >> On Tue, Jan 21, 2020 at 12:34 PM Yang Wang wrote: >> >>> Hi satya, >>> >>> Maybe the job has been submitted to Dispatcher successfully and the >>> internal submitting job takes >>> too long time(more than 10s). So it failed with timeout. Could you >>> please set the `web.timeout: 3` >>> and run again? >>> >>> >>> >>> Best, >>> Yang >>> >>> satya brat 于2020年1月20日周一 下午4:34写道: >>> >>>> I am using standalone cluster of Flink with 1 jobManager and n >>>> taskManagers. When I try to submit a job via command line, the job >>>> submission fails with error message as >>>> org.apache.flink.client.program.ProgramInvocationException: Could not >>>> submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e). >>>> >>>> On jobManager instance, everything works fine till the job is switched >>>> from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the >>>> following stacktrace >>>> >>>> akka.pattern.AskTimeoutException: Ask timed out on >>>> [Actor[akka://flink/user/dispatcher#-177004106]] after [10 ms]. >>>> Sender[null] sent message of type >>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". >>>> at >>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) >>>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) >>>> at >>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) >>>> at >>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) >>>> at >>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> I went through the flink code on github and all the steps required to >>>> execute a job seems to be running fine. However, when jobManager has to >>>> give job submission ack to flink client that triggered the job, the >>>> jobSubmitHandler times out on the akka dispatcher that according to my >>>> understanding takes care of communicating with the job client. >>>> >>>> The Flink job consists for 1 Source (kafka), 2 operators and 1 >>>> sink(Custom Sink). Following link shows the jobManager logs: >>>> https://pastebin.com/raw/3GaTtNrG >>>> >>>> Once the dispatcher times out, all other Flink UI calls also timeout >>>> with same exception. >>>> >>>> Following are the flink client logs that is used to submit job via >>>> command line. >>>> >>>> 2019-09-28 19:34:21,321 INFO org.apache.flink.client.cli.CliFrontend >>>> - >>>> >>>> 2019-09-28 19:34:21,322 INFO org.apache.flink.client.cli.CliFrontend >>>> - Starting Command Line Client (Version: 1.8.0, >>>> Rev:, Date:) >>>> 2019-09-28 19:34:21,322 INFO org.apache.flink.client.cli.CliFrontend >>
Re: Flink Job Submission Fails even though job is running
I guess it is a jm internal error which crashes the dispatcher or race condition so that the returning future never completed, possibly related to jdk bug. But again, never have a log in the case I cannot conclude anything. Best, tison. tison 于2020年1月22日周三 上午10:49写道: > It is a known issue reported multiple times that if you are in an early > jdk 1.8.x version, upgrade the bugfix version and the issue will vanish. > > I don't ever have a log on jm side when this issue reported so I'm sorry > unable to explain more... > > Best, > tison. > > > Yang Wang 于2020年1月22日周三 上午10:46写道: > >> The "web.timeout" will be used for all web monitor asynchronous >> operations, including the >> "DispatcherGateway.submitJob" in the "JobSubmitHandler". >> So when you increase the timeout, does it still could not work? >> >> Best, >> Yang >> >> satya brat 于2020年1月21日周二 下午8:57写道: >> >>> How does web.timeout help hear?? The issue is with respect to aka >>> dispatched timing out. The job is submitted to the task managers but the >>> response doesn't reach the client. >>> >>> On Tue, Jan 21, 2020 at 12:34 PM Yang Wang >>> wrote: >>> >>>> Hi satya, >>>> >>>> Maybe the job has been submitted to Dispatcher successfully and the >>>> internal submitting job takes >>>> too long time(more than 10s). So it failed with timeout. Could you >>>> please set the `web.timeout: 3` >>>> and run again? >>>> >>>> >>>> >>>> Best, >>>> Yang >>>> >>>> satya brat 于2020年1月20日周一 下午4:34写道: >>>> >>>>> I am using standalone cluster of Flink with 1 jobManager and n >>>>> taskManagers. When I try to submit a job via command line, the job >>>>> submission fails with error message as >>>>> org.apache.flink.client.program.ProgramInvocationException: Could not >>>>> submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e). >>>>> >>>>> On jobManager instance, everything works fine till the job is switched >>>>> from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the >>>>> following stacktrace >>>>> >>>>> akka.pattern.AskTimeoutException: Ask timed out on >>>>> [Actor[akka://flink/user/dispatcher#-177004106]] after [10 ms]. >>>>> Sender[null] sent message of type >>>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". >>>>> at >>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) >>>>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) >>>>> at >>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) >>>>> at >>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) >>>>> at >>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) >>>>> at >>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) >>>>> at >>>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) >>>>> at >>>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) >>>>> at >>>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> I went through the flink code on github and all the steps required to >>>>> execute a job seems to be running fine. However, when jobManager has to >>>>> give job submission ack to flink client that triggered the job, the >>>>> jobSubmitHandler times out on the akka dispatcher that according to my >>>>> understanding takes care of communicating with the job client. >>>>> >>>>> The Flink job consists for 1 Source (kafka), 2 operators and 1 >>>>> sink(Custom Sink). Following link shows the jobManager logs: >>>>> https://pastebin.com/raw/3GaTtNrG >>>>> >>>>> Once the dispatcher times out, all other Flink UI calls also timeout >>>>> with same exception. >>>>> >>>>
Re: Flink HA for Job Cluster
Hi Krzysztof, Flink doesn't provide JM HA itself yet. For YARN deployment, you can rely on yarn.application-attempts configuration[1]; for Kubernetes deployment, Flink uses Kubernetes deployment to restart a failed JM. Though, such standalone mode doesn't tolerate JM failure and strategies above just restart the application, which means all tasks will be killed and restarted. Best, tison. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html#configuration-1 KristoffSC 于2020年2月7日周五 下午11:34写道: > Hi, > In [1] where we can find setup for Stand Alone an YARN clusters to achieve > Job Manager's HA. > > Is Standalone Cluster High Availability with a zookeeper the same approach > for Docker's Job Cluster approach with Kubernetes? > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html > > Thanks, > Krzysztof > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
Congrats Jingsong, well deserved! Best, tison. Xintong Song 于2020年2月21日周五 上午11:09写道: > Congratulations, Jingsong. Well deserved~! > > > Thank you~ > > Xintong Song > > > > On Fri, Feb 21, 2020 at 11:05 AM Kurt Young wrote: > >> Hi everyone, >> >> I'm very happy to announce that Jingsong Lee accepted the offer of the >> Flink PMC to >> become a committer of the Flink project. >> >> Jingsong Lee has been an active community member for more than a year >> now. He is >> mainly focus on Flink SQL, played an essential role during blink planner >> merging, drives >> FLIP-63 and helped implementing rework expression design, and also >> implemented >> and fixed lots of features and bugs in Flink SQL. Moreover, he is very >> active in both dev >> and user mailing lists, helped discussing designs and answering users >> questions, also >> helped to verify various releases. >> >> Congratulations Jingsong! >> >> Best, Kurt >> (on behalf of the Flink PMC) >> >> >>
Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.0 released
Congrats! Thank you all for making this release happen. Best, tison. rui fan <1996fan...@gmail.com> 于2022年6月5日周日 17:19写道: > Thanks Yang for driving the release, and thanks to > all contributors for making this release happen! > > Best wishes > Rui Fan > > On Sun, Jun 5, 2022 at 4:14 PM Yang Wang wrote: > > > The Apache Flink community is very happy to announce the release of > Apache > > Flink Kubernetes Operator 1.0.0. > > > > The Flink Kubernetes Operator allows users to manage their Apache Flink > > applications and their lifecycle through native k8s tooling like kubectl. > > This is the first production ready release and brings numerous > > improvements and new features to almost every aspect of the operator. > > > > Please check out the release blog post for an overview of the release: > > > > > https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html > > > > The release is available for download at: > > https://flink.apache.org/downloads.html > > > > Maven artifacts for Flink Kubernetes Operator can be found at: > > > > > https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator > > > > Official Docker image for Flink Kubernetes Operator applications can be > > found at: > > https://hub.docker.com/r/apache/flink-kubernetes-operator > > > > The full release notes are available in Jira: > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351500 > > > > We would like to thank all contributors of the Apache Flink community who > > made this release possible! > > > > Regards, > > Gyula & Yang > > >