大家好,
在阅读源码中看到以下代码片段,initializeState 方法内有如下代码:

                if (context.isRestored()) {
                        LOG.info("{} - restoring state", name());
                        for (State<TXN, CONTEXT> operatorState : state.get()) {
                                userContext = operatorState.getContext();
                                List<TransactionHolder&lt;TXN>> 
recoveredTransactions =
operatorState.getPendingCommitTransactions();
                                List<TXN> handledTransactions = new
ArrayList<>(recoveredTransactions.size() + 1);
                                for (TransactionHolder<TXN> 
recoveredTransaction :
recoveredTransactions) {
                                        // If this fails to succeed eventually, 
there is actually data loss
                                        
recoverAndCommitInternal(recoveredTransaction);
                                        
handledTransactions.add(recoveredTransaction.handle);
                                        LOG.info("{} committed recovered 
transaction {}", name(),
recoveredTransaction);
                                }

                                {
                                        TXN transaction = 
operatorState.getPendingTransaction().handle;
                                        recoverAndAbort(transaction);
                                        handledTransactions.add(transaction);
                                        LOG.info("{} aborted recovered 
transaction {}", name(),
operatorState.getPendingTransaction());
                                }

                                if (userContext.isPresent()) {
                                        
finishRecoveringContext(handledTransactions);
                                        recoveredUserContext = true;
                                }
                        }
                }

如上,请教一下为什么 

                                        TXN transaction = 
operatorState.getPendingTransaction().handle;
                                        recoverAndAbort(transaction);
                                        handledTransactions.add(transaction);
                                        LOG.info("{} aborted recovered 
transaction {}", name(),
operatorState.getPendingTransaction());
需要被花括号包裹起来,在阅读代码过程中并没有感受到这个代码需要被 {} 包裹的必要。
希望得到各位的解答!




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复