Another point the JM had terminated. The policy on K8s for Job Cluster is spec: restartPolicy: OnFailure
*2019-06-29 00:33:14,308 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with exit code 1443.* On Sat, Jun 29, 2019 at 9:04 AM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > I have not tried on bare metal. We have no option but k8s. > > And this is a job cluster. > > On Sat, Jun 29, 2019 at 9:01 AM Timothy Victor <vict...@gmail.com> wrote: > >> Hi Vishal, can this be reproduced on a bare metal instance as well? >> Also is this a job or a session cluster? >> >> Thanks >> >> Tim >> >> On Sat, Jun 29, 2019, 7:50 AM Vishal Santoshi <vishal.santo...@gmail.com> >> wrote: >> >>> OK this happened again and it is bizarre ( and is definitely not what I >>> think should happen ) >>> >>> >>> >>> >>> The job failed and I see these logs ( In essence it is keeping the last >>> 5 externalized checkpoints ) but deleting the zk checkpoints directory >>> >>> >>> >>> >>> >>> >>> *06.28.2019 20:33:13.738 2019-06-29 00:33:13,736 INFO >>> org.apache.flink.runtime.checkpoint.CompletedCheckpoint - Checkpoint >>> with ID 5654 at >>> 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5654' >>> not discarded. 06.28.2019 20:33:13.788 2019-06-29 00:33:13,786 INFO >>> org.apache.flink.runtime.checkpoint.CompletedCheckpoint - Checkpoint >>> with ID 5655 at >>> 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5655' >>> not discarded. 06.28.2019 20:33:13.838 2019-06-29 00:33:13,836 INFO >>> org.apache.flink.runtime.checkpoint.CompletedCheckpoint - Checkpoint >>> with ID 5656 at >>> 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5656' >>> not discarded. 06.28.2019 20:33:13.888 2019-06-29 00:33:13,886 INFO >>> org.apache.flink.runtime.checkpoint.CompletedCheckpoint - Checkpoint >>> with ID 5657 at >>> 'xxxxxxxxxx:8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5657' >>> not discarded. 06.28.2019 20:33:13.938 2019-06-29 00:33:13,936 INFO >>> org.apache.flink.runtime.checkpoint.CompletedCheckpoint - Checkpoint >>> with ID 5658 at >>> 'xxxxxxxxxx8020/analytics_eng/kafka-to-hdfs-states/00000000000000000000000000000005/chk-5658' >>> not discarded. 06.28.2019 20:33:13.938 2019-06-29 00:33:13,936 INFO >>> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore - Removing >>> /kafka-to-hdfs-v2/kafka-to-hdfs-v2/k8s/checkpoints/00000000000000000000000000000005 >>> from ZooKeeper* >>> >>> The job restarts and this is bizzare. It does not find the ZK checkpoint >>> directory but instead of going to the state.checkpoints.dir to get it's >>> last checkpoint, it restarts from a save point that we started this job >>> with ( resetting the checkpoint id ) like about 15 days ago >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> * 06.28.2019 20:33:20.047 2019-06-29 00:33:20,045 INFO >>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - >>> Recovering checkpoints from ZooKeeper. 06.28.2019 20:33:20.053 >>> 2019-06-29 00:33:20,051 INFO >>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - >>> Trying to fetch 0 checkpoints from storage. 06.28.2019 20:33:20.053 >>> 2019-06-29 00:33:20,051 INFO >>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - >>> Found 0 checkpoints in ZooKeeper. 06.28.2019 20:33:20.054 2019-06-29 >>> 00:33:20,053 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting >>> job 00000000000000000000000000000005 from savepoint >>> hdfs://nn-crunchy:8020/flink-savepoints_k8s/prod/kafka-to-hdfs/savepoint-000000-128f419cdc6f >>> () 06.28.2019 20:33:20.540 2019-06-29 00:33:20,538 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the >>> checkpoint ID of job 00000000000000000000000000000005 to 4203. >>> 06.28.2019 20:33:20.540 2019-06-29 00:33:20,538 INFO >>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - >>> Recovering checkpoints from ZooKeeper. 06.28.2019 20:33:20.550 >>> 2019-06-29 00:33:20,549 INFO >>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - >>> Trying to retrieve checkpoint 4202. 06.28.2019 20:33:20.550 >>> 2019-06-29 00:33:20,548 INFO >>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - >>> Found 1 checkpoints in ZooKeeper. 06.28.2019 20:33:20.550 2019-06-29 >>> 00:33:20,549 INFO >>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - >>> Trying to fetch 1 checkpoints from storage.* >>> >>> >>> >>> This just does not make sense.... >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Wed, Jun 5, 2019 at 9:29 AM Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Ok, I will do that. >>>> >>>> On Wed, Jun 5, 2019, 8:25 AM Chesnay Schepler <ches...@apache.org> >>>> wrote: >>>> >>>>> Can you provide us the jobmanager logs? >>>>> >>>>> After the first restart the JM should have started deleting older >>>>> checkpoints as new ones were created. >>>>> After the second restart the JM should have recovered all 10 >>>>> checkpoints, start from the latest, and start pruning old ones as new ones >>>>> were created. >>>>> >>>>> So you're running into 2 separate issues here, which is a bit odd. >>>>> >>>>> On 05/06/2019 13:44, Vishal Santoshi wrote: >>>>> >>>>> Any one? >>>>> >>>>> On Tue, Jun 4, 2019, 2:41 PM Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> The above is flink 1.8 >>>>>> >>>>>> On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> I had a sequence of events that created this issue. >>>>>>> >>>>>>> * I started a job and I had the state.checkpoints.num-retained: 5 >>>>>>> >>>>>>> * As expected I have 5 latest checkpoints retained in my hdfs >>>>>>> backend. >>>>>>> >>>>>>> >>>>>>> * JM dies ( K8s limit etc ) without cleaning the hdfs directory. >>>>>>> The k8s job restores from the latest checkpoint ( I think ) but as it >>>>>>> creates new checkpoints it does not delete the older chk point. At the >>>>>>> end >>>>>>> there are now 10 chkpoints, 5 from the old run which remain static and >>>>>>> 5 >>>>>>> latest representing the on going pipe. >>>>>>> >>>>>>> * The JM dies again and restart from the latest from the 5 old >>>>>>> checkpoints. >>>>>>> >>>>>>> This looks a bug in the Job Cluster implementation of flink. It >>>>>>> looks like it is taking the 5th checkpoint from the beginning based on >>>>>>> num-retained value, Note that it has the same job id and does not scope >>>>>>> to >>>>>>> a new directory. >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/flink/blob/1dfdaa417ab7cdca9bef1efe6381c7eb67022aaf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L109 >>>>>>> >>>>>>> Please tell me if this does not make sense. >>>>>>> >>>>>>> Vishal >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>