[ https://issues.apache.org/jira/browse/FLINK-11738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
thinktothings updated FLINK-11738: ---------------------------------- Description: Akka.ask.timeout 10 seconds, this miniCluster environment is written dead, can not be changed? --------------------------------------------------------------------------------------------------------------------------------- org.apache.flink.runtime.minicluster.MiniCluster /** * Creates a new Flink mini cluster based on the given configuration. * * @param miniClusterConfiguration The configuration for the mini cluster */ public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) \{ this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, "config may not be null"); this.rpcTimeout = Time.seconds(10L); this.terminationFuture = CompletableFuture.completedFuture(null); running = false; } --------------------------------------------------------------------------------------------------------------------------------- !image-2019-02-25-13-11-13-723.png! --------------------------------------------------------------------------------------------------------------------------------- package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 输入数据 */ object SocketWindowWordCount { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // get input data by connecting to the socket val dataStream = env.socketTextStream("localhost", port, '\n') import org.apache.flink.streaming.api.scala._ val textResult = dataStream.flatMap( w => w.split(" s") ).map( w => WordWithCount(w,1)) .keyBy("word") /** * 每5秒刷新一次,相当于重新开始计数, * 好处,不需要一直拿所有的数据统计 * 只需要在指定时间间隔内的增量数据,减少了数据规模 */ .timeWindow(Time.seconds(5)) .sum("count" ) textResult.print().setParallelism(1) if(args == null || args.size ==0) { env.execute("默认作业") } else { env.execute(args(0)) } println("结束") } // Data type for words with count case class WordWithCount(word: String, count: Long) } --------------------------------------------------------------------------------------------------------------------------------- was: Akka.ask.timeout 10 seconds, this miniCluster environment is written dead, can not be changed? --------------------------------------------------------------------------------------------------------------------------------- org.apache.flink.runtime.minicluster.MiniCluster /** * Creates a new Flink mini cluster based on the given configuration. * * @param miniClusterConfiguration The configuration for the mini cluster */ public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) { this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, "config may not be null"); this.rpcTimeout = Time.seconds(10L); this.terminationFuture = CompletableFuture.completedFuture(null); running = false; } --------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------- package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 输入数据 */ object SocketWindowWordCount { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // get input data by connecting to the socket val dataStream = env.socketTextStream("localhost", port, '\n') import org.apache.flink.streaming.api.scala._ val textResult = dataStream.flatMap( w => w.split(" s") ).map( w => WordWithCount(w,1)) .keyBy("word") /** * 每5秒刷新一次,相当于重新开始计数, * 好处,不需要一直拿所有的数据统计 * 只需要在指定时间间隔内的增量数据,减少了数据规模 */ .timeWindow(Time.seconds(5)) .sum("count" ) textResult.print().setParallelism(1) if(args == null || args.size ==0) { env.execute("默认作业") } else { env.execute(args(0)) } println("结束") } // Data type for words with count case class WordWithCount(word: String, count: Long) } --------------------------------------------------------------------------------------------------------------------------------- > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher15e85f5d-a55d-4773-8197-f0db5658f55b#1335897563]] > after [10000 ms]. Sender[null] sent > -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-11738 > URL: https://issues.apache.org/jira/browse/FLINK-11738 > Project: Flink > Issue Type: Bug > Components: Client > Affects Versions: 1.7.2 > Environment: flink 1.7.2 client > !image-2019-02-25-10-57-20-106.png! > > !image-2019-02-25-10-57-32-876.png! > > > !image-2019-02-25-10-57-39-753.png! > > > > > > Reporter: thinktothings > Priority: Major > Attachments: image-2019-02-25-13-11-13-723.png > > > Akka.ask.timeout 10 seconds, this miniCluster environment is written dead, > can not be changed? > --------------------------------------------------------------------------------------------------------------------------------- > org.apache.flink.runtime.minicluster.MiniCluster > /** > * Creates a new Flink mini cluster based on the given configuration. > * > * @param miniClusterConfiguration The configuration for the mini cluster > */ > public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) \{ > this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, > "config may not be null"); this.rpcTimeout = Time.seconds(10L); > this.terminationFuture = CompletableFuture.completedFuture(null); running = > false; } > --------------------------------------------------------------------------------------------------------------------------------- > !image-2019-02-25-13-11-13-723.png! > > > --------------------------------------------------------------------------------------------------------------------------------- > > package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.api.windowing.time.Time > /** > * nc -lk 1234 输入数据 > */ > object SocketWindowWordCount { > def main(args: Array[String]): Unit = { > val port = 1234 > // get the execution environment > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > // get input data by connecting to the socket > val dataStream = env.socketTextStream("localhost", port, '\n') > import org.apache.flink.streaming.api.scala._ > val textResult = dataStream.flatMap( w => w.split(" > s") ).map( w => WordWithCount(w,1)) > .keyBy("word") > /** > * 每5秒刷新一次,相当于重新开始计数, > * 好处,不需要一直拿所有的数据统计 > * 只需要在指定时间间隔内的增量数据,减少了数据规模 > */ > .timeWindow(Time.seconds(5)) > .sum("count" ) > textResult.print().setParallelism(1) > if(args == null || args.size ==0) > { env.execute("默认作业") } > else > { env.execute(args(0)) } > println("结束") > } > // Data type for words with count > case class WordWithCount(word: String, count: Long) > } > > --------------------------------------------------------------------------------------------------------------------------------- -- This message was sent by Atlassian JIRA (v7.6.3#76005)