RivenSun created KAFKA-13857:
--------------------------------
Summary: The listOffsets method of KafkaAdminClient should support
returning logEndOffset of topicPartition
Key: KAFKA-13857
URL: https://issues.apache.org/jira/browse/KAFKA-13857
Project: Kafka
Issue Type: Bug
Components: admin
Reporter: RivenSun
The server side currently handles the LIST_OFFSETS request process as follows:
{code:java}
KafkaApis.handleListOffsetRequest() ->
KafkaApis.handleListOffsetRequestV1AndAbove() ->
ReplicaManager.fetchOffsetForTimestamp() ->
Partition.fetchOffsetForTimestamp(){code}
In the last method above, it is obvious that when the client side does not pass
the isolationLevel value, the server side supports returning
localLog.logEndOffset.
{code:java}
val lastFetchableOffset = isolationLevel match {
case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
case None => localLog.logEndOffset
}
{code}
KafkaAdminClient is an operation and maintenance management tool, which *should
be different from the listOffsets-related methods (offsetsForTimes,
beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be
limited by the value of {color:#FF0000}isolationLevel {color}in the
ListOffsetsOptions parameter.*
In the current KafkaAdminClient.listOffsets() method, both the AdminClient and
the server consider isolationLevel as a required parameter:
1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException
will be thrown when AdminClient executes listOffsets() method.
{code:java}
ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
2) The current logic for converting isolationLevel on the server side has not
yet handled the case where the user passes in a value that is neither
READ_UNCOMMITTED nor READ_COMMITTED :
{code:java}
val isolationLevelOpt = if (isClientRequest)
Some(offsetRequest.isolationLevel)
else
None {code}
{code:java}
public IsolationLevel isolationLevel() {
return IsolationLevel.forId(data.isolationLevel());
} {code}
h1.
Solution:
Added a new enum `NONE` in IsolationLevel, dedicated to
AdminClient.listOffsets() method.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)