打扰了,解决了,原因是因为启动时没有配置savepoint路径。
> 2022年4月30日 12:09,Arthur Li <lianyou1...@126.com> 写道: > > 大家好, > > > 我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢 > 1. 启动checkpoint > 2. 设置statebackend为FsStateBackend > 3. 从socketTextStream读取数据,统计单词个数 > (“hello”, 5), (“world”, 1) > 4. 通过触发异常,来模拟终止程序 > 5. 重新启动程序,那么启动之后的统计数据的初始值应该是上一次checkpoint 成功存储的值 > (“hello”, 5), (“world”, 1) , 那么再次输入hello, 应该输出(“hello”, 6) > 而在实际输出结果为(“hello”, 1) > > 环境和版本信息 > 1. MacOS - Oracle JDK 1.8 > 2. 版本信息 > <properties> > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > <flink.version>1.11.6</flink.version> > <flink.lang>scala</flink.lang> > <target.java.version>1.8</target.java.version> > <scala.binary.version>2.12</scala.binary.version> > <maven.compiler.source>${target.java.version}</maven.compiler.source> > <maven.compiler.target>${target.java.version}</maven.compiler.target> > <log4j.version>2.12.1</log4j.version> > </properties> > > 代码 > object RestartStrategyFsStateBackend { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.enableCheckpointing(1000L) > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2)) > > val backendPath = > "file:///Users/arthur/Documents/Workspace/java/quickstart" + > "/flink-spring/src/main/resources/backend.out/restartstrategyv3" > env.setStateBackend(new FsStateBackend(backendPath)) > > // socket数据源 > env.socketTextStream("localhost", 7077) > .map(value => { > if (value == "restart") { > throw new RuntimeException("restart is triggered, oooops~~~~~") > } > (value, 1) > } > ) > .keyBy(_._1) > .sum(1) > .print("RestartStrategy") > > env.execute("RestartStrategy") > } > } > > BR. > Arthur > > > > >