gaoyunhaii commented on pull request #16655: URL: https://github.com/apache/flink/pull/16655#issuecomment-894930752
Hi @StephanEwen very thanks for the review! > You can only change the JobGraph operator chains from a checkpoint if it has no partially finished tasks. I also agree that keeping a simple rule would be more easy to implement and maintain, and the rule would work if there are some difficulty in implementing the checks, the main block comes from that it seems we could not easily acquire the original job graph and do the comparison ? > The code only works with the generated Operator ID, and ignores the user-defined operator IDs. I am not sure this is correct, I think we need to register finished state for both generated and user-defined IDs, because on that level, we don't know under which ID the operator will communicate its state. @gaoyunhaii have you looked into this? For the`OperatorIDPair`, currently the name might have some misleadings: the `generatedOperatorID` is based on both uid and the nature order if uid is not set (`StreamGraphHasherV2`), and the `userDefinedOperatorID` is based on the `uidHash` (`StreamGraphUserHashHasher`). The second one should only be used if users forget to set a customized `uid` on the first run, but want to restore from the state, then users could directly specify the `uidHash` and it will be used as the operator id directly. Thus in the currently logic the userDefinedOperatorID is only checked on restoring to query the corresponding state, but for creating new checkpoint, the `generatedOperatorID` is directly used, when storing the finished status we also follow this policy~ > I think for a good design, the PendingCheckpoint should not need to be aware of ExecutionJobVertex and iterate over the status or implement the logic to check for finished state. I also agree with the refactor on the CheckpointPlan and PendingCheckpoint and very thanks for the suggestions! I'll try to update the PR according to the suggestions. From my side, we can do one of the following things: >1. Bump the metadata format version. This is relatively simple. > The tests are the most work here. We need to create Snapshots of V3 metadata that we reload with the latest setup to > ensure backwards compatibility. > We can use the trick here, but then we should nit do this inline, but have two explicit static methods that we use, so that > this is explicit: > - int encodeSubtaskIndex(int subtaskIndex, boolean isFinished) > - SubtaskAndStatus decodeSubtaskAndStatus(int value) (where SubtaskAndStatus is like an (int, boolean) tuple.) For the state metadata format, I'll still more tend to the second options that we first do not upgrade the version and extract them to separate methods, we would still be able to consider upgrading the format if we found other requirements later~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org