Hi all 小弟遇到个问题期望大佬解答解答: 通过 env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据,
flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到 “/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢? public class FlinkTestDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000); env.getConfig().setAutoWatermarkInterval(200); env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints")); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings); bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5)); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "6000"); configuration.setString("table.exec.mini-batch.size", "5000"); | | 刘海 | | liuha...@163.com | 签名由网易邮箱大师定制