[ https://issues.apache.org/jira/browse/KAFKA-13148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson resolved KAFKA-13148. ------------------------------------- Resolution: Fixed Closing this since it was fixed by KAFKA-12158. > Kraft Controller doesn't handle scheduleAppend returning Long.MAX_VALUE > ----------------------------------------------------------------------- > > Key: KAFKA-13148 > URL: https://issues.apache.org/jira/browse/KAFKA-13148 > Project: Kafka > Issue Type: Bug > Components: controller, kraft > Reporter: Jose Armando Garcia Sancio > Assignee: Niket Goel > Priority: Major > Labels: kip-500 > > In some cases the RaftClient will return Long.MAX_VALUE: > {code:java} > /** > * Append a list of records to the log. The write will be scheduled for > some time > * in the future. There is no guarantee that appended records will be > written to > * the log and eventually committed. However, it is guaranteed that if > any of the > * records become committed, then all of them will be. > * > * If the provided current leader epoch does not match the current > epoch, which > * is possible when the state machine has yet to observe the epoch > change, then > * this method will return {@link Long#MAX_VALUE} to indicate an offset > which is > * not possible to become committed. The state machine is expected to > discard all > * uncommitted entries after observing an epoch change. > * > * @param epoch the current leader epoch > * @param records the list of records to append > * @return the expected offset of the last record; {@link > Long#MAX_VALUE} if the records could > * be committed; null if no memory could be allocated for the > batch at this time > * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException > if the size of the records is greater than the maximum > * batch size; if this exception is throw none of the elements > in records were > * committed > */ > Long scheduleAtomicAppend(int epoch, List<T> records); > {code} > The controller doesn't handle this case: > {code:java} > // If the operation returned a batch of records, those > records need to be > // written before we can return our result to the user. > Here, we hand off > // the batch of records to the raft client. They will be > written out > // asynchronously. > final long offset; > if (result.isAtomic()) { > offset = > raftClient.scheduleAtomicAppend(controllerEpoch, result.records()); > } else { > offset = raftClient.scheduleAppend(controllerEpoch, > result.records()); > } > op.processBatchEndOffset(offset); > writeOffset = offset; > resultAndOffset = ControllerResultAndOffset.of(offset, > result); > for (ApiMessageAndVersion message : result.records()) { > replay(message.message(), Optional.empty(), offset); > } > snapshotRegistry.getOrCreateSnapshot(offset); > log.debug("Read-write operation {} will be completed when > the log " + > "reaches offset {}.", this, resultAndOffset.offset()); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)