[
https://issues.apache.org/jira/browse/KAFKA-6404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yu Gan updated KAFKA-6404:
--------------------------
Description:
*kafka broker version*: 0.10.0.1
*cluster level*: 200+ nodes, no acls, any clients in the same LAN could access
*situation*: someone uses high released version (such as 0.11.x) of
bin/kafka-console-consumer.sh with parameter "--zookeeper" to continuously
consume a topic with partitions spread all the brokers
*phenomenon*:
1.broker server log:
errors like: 1) Connection to 2 was disconnected before the response was read;
2) Shrinking ISR for partition [abc, 21] from 33,13,14 to 33;
3) ERROR Processor got uncaught exception. (kafka.network.Processor)
java.nio.BufferUnderflowException
2.common consumers keeping in rebalance status:
errors like:
1) c.p.b.f.l.c.FiberTopoWorkerThread : got uncaught exception
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the partitions to
another member. This means that the time between subsequent calls to poll() was
longer than the configured session.timeout.ms, which typically implies that the
poll loop is spending too much time message processing. You can address this
either by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
2) java.lang.IllegalStateException: Correlation id for response (1246203) does
not match request (1246122)
*bad results*: kafka cluster in sick
*root cause*:
1) OldConsumer after 0.10.1 in ConsumerFetcherThread.scala setting
requestVersion 3:
{code:java}
private val fetchRequestBuilder = new FetchRequestBuilder().
clientId(clientId).
replicaId(Request.OrdinaryConsumerId).
maxWait(config.fetchWaitMaxMs).
minBytes(config.fetchMinBytes).
requestVersion(3) // for now, the old consumer is pinned to the old message
format through the fetch request
{code}
but in 0.10.0.1 FetchRequest.CurrentVersion=2, FetchRequst.readFrom wouldn't
read the field "max_bytes" from version 3 :
{code:java}
def readFrom(buffer: ByteBuffer): FetchRequest = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val replicaId = buffer.getInt
val maxWait = buffer.getInt
val minBytes = buffer.getInt
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
val offset = buffer.getLong
val fetchSize = buffer.getInt
(TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset,
fetchSize))
})
})
FetchRequest(versionId, correlationId, clientId, replicaId, maxWait,
minBytes, Map(pairs:_*))
}
{code}
2) when the FetchRequst.readFrom crashed with throwable like
"BufferUnderflowException" not in "(InvalidRequestException, SchemaException)",
the socket wouldn't be closed;
SocketServer.processCompletedReceives:
{code:java}
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
val openChannel = selector.channel(receive.source)
// Only methods that are safe to call on a disconnected channel should
be invoked on 'openOrClosingChannel'.
val openOrClosingChannel = if (openChannel != null) openChannel else
selector.closingChannel(receive.source)
val session = RequestChannel.Session(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE,
openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId =
receive.source, session = session,
buffer = receive.payload, startTimeNanos = time.nanoseconds,
listenerName = listenerName, securityProtocol = securityProtocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that
receive.source is valid. Issues with constructing a valid receive object were
handled earlier
error(s"Closing socket for ${receive.source} because of error", e)
close(selector, receive.source)
}
}
}
{code}
*workaround but not the optimal*:
throw a known InvalidRequestException(or SchemaException more suitable) in
RequestChannel.scala:
{code:java}
/*// TODO: this will be removed once we migrated to client-side format
// for server-side request / response format
// NOTE: this map only includes the server-side request/response handlers.
Newer
// request types should only use the client-side versions which are parsed
with
// o.a.k.common.requests.AbstractRequest.getRequest()
private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) =>
RequestOrResponse]=
Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
)
// TODO: this will be removed once we migrated to client-side format
val requestObj =
keyToNameAndDeserializerMap.get(requestId).map(readFrom =>
readFrom(buffer)).orNull*/
val requestObj: RequestOrResponse = requestId match {
case ApiKeys.FETCH.id => getFetchRequest()
case ApiKeys.CONTROLLED_SHUTDOWN_KEY.id =>
ControlledShutdownRequest.readFrom(buffer)
case _ => null
}
def getFetchRequest(): FetchRequest = {
try{
FetchRequest.readFrom(buffer)
}catch {
case ex: Throwable =>
throw new InvalidRequestException(s"FetchRequest version for API key
not match server's "+ requestId + ": " + FetchRequest.CurrentVersion, ex)
}
}
{code}
was:
*kafka broker version*: 0.10.0.1
*cluster level*: 200+ nodes, no acls, any clients in the same LAN could access
*situation*: someone uses high released version (such as 0.11.x) of
bin/kafka-console-consumer.sh with parameter "--zookeeper" to continuously
consume a topic with partitions spread all the brokers
*phenomenon*:
1.broker server log:
errors like: 1) Connection to 2 was disconnected before the response was read;
2) Shrinking ISR for partition [abc, 21] from 33,13,14 to 33;
3) ERROR Processor got uncaught exception. (kafka.network.Processor)
java.nio.BufferUnderflowException
2.common consumers keeping in rebalance status:
errors like:
1) c.p.b.f.l.c.FiberTopoWorkerThread : got uncaught exception
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the partitions to
another member. This means that the time between subsequent calls to poll() was
longer than the configured session.timeout.ms, which typically implies that the
poll loop is spending too much time message processing. You can address this
either by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
2) java.lang.IllegalStateException: Correlation id for response (1246203) does
not match request (1246122)
*bad results*: kafka cluster in sick
*root cause*:
1) OldConsumer after 0.10.1 in ConsumerFetcherThread.scala setting
requestVersion 3:
{code:java}
private val fetchRequestBuilder = new FetchRequestBuilder().
clientId(clientId).
replicaId(Request.OrdinaryConsumerId).
maxWait(config.fetchWaitMaxMs).
minBytes(config.fetchMinBytes).
requestVersion(3) // for now, the old consumer is pinned to the old message
format through the fetch request
{code}
but in 0.10.0.1 FetchRequest.CurrentVersion=2, FetchRequst.readFrom wouldn't
read the field "max_bytes" from version 3 :
{code:java}
def readFrom(buffer: ByteBuffer): FetchRequest = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val replicaId = buffer.getInt
val maxWait = buffer.getInt
val minBytes = buffer.getInt
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
val offset = buffer.getLong
val fetchSize = buffer.getInt
(TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset,
fetchSize))
})
})
FetchRequest(versionId, correlationId, clientId, replicaId, maxWait,
minBytes, Map(pairs:_*))
}
{code}
2) when the FetchRequst.readFrom crashed with throwable like
"BufferUnderflowException" not in "(InvalidRequestException, SchemaException)",
the socket wouldn't be closed;
SocketServer.processCompletedReceives:
{code:java}
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
val openChannel = selector.channel(receive.source)
// Only methods that are safe to call on a disconnected channel should
be invoked on 'openOrClosingChannel'.
val openOrClosingChannel = if (openChannel != null) openChannel else
selector.closingChannel(receive.source)
val session = RequestChannel.Session(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE,
openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId =
receive.source, session = session,
buffer = receive.payload, startTimeNanos = time.nanoseconds,
listenerName = listenerName, securityProtocol = securityProtocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that
receive.source is valid. Issues with constructing a valid receive object were
handled earlier
error(s"Closing socket for ${receive.source} because of error", e)
close(selector, receive.source)
}
}
}
{code}
*workaround but not the optimal*:
RequestChannel.scala:
{code:java}
/*// TODO: this will be removed once we migrated to client-side format
// for server-side request / response format
// NOTE: this map only includes the server-side request/response handlers.
Newer
// request types should only use the client-side versions which are parsed
with
// o.a.k.common.requests.AbstractRequest.getRequest()
private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) =>
RequestOrResponse]=
Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
)
// TODO: this will be removed once we migrated to client-side format
val requestObj =
keyToNameAndDeserializerMap.get(requestId).map(readFrom =>
readFrom(buffer)).orNull*/
val requestObj: RequestOrResponse = requestId match {
case ApiKeys.FETCH.id => getFetchRequest()
case ApiKeys.CONTROLLED_SHUTDOWN_KEY.id =>
ControlledShutdownRequest.readFrom(buffer)
case _ => null
}
def getFetchRequest(): FetchRequest = {
try{
FetchRequest.readFrom(buffer)
}catch {
case ex: Throwable =>
throw new InvalidRequestException(s"FetchRequest version for API key
not match server's "+ requestId + ": " + FetchRequest.CurrentVersion, ex)
}
}
{code}
> OldConsumer FetchRequest apiVersion not match resulting in broker
> RequestHandler socket leak
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-6404
> URL: https://issues.apache.org/jira/browse/KAFKA-6404
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 0.10.0.1
> Reporter: Yu Gan
> Priority: Critical
>
> *kafka broker version*: 0.10.0.1
> *cluster level*: 200+ nodes, no acls, any clients in the same LAN could access
> *situation*: someone uses high released version (such as 0.11.x) of
> bin/kafka-console-consumer.sh with parameter "--zookeeper" to continuously
> consume a topic with partitions spread all the brokers
> *phenomenon*:
> 1.broker server log:
> errors like: 1) Connection to 2 was disconnected before the response was read;
> 2) Shrinking ISR for partition [abc, 21] from 33,13,14 to 33;
> 3) ERROR Processor got uncaught exception. (kafka.network.Processor)
> java.nio.BufferUnderflowException
> 2.common consumers keeping in rebalance status:
> errors like:
> 1) c.p.b.f.l.c.FiberTopoWorkerThread : got uncaught exception
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the partitions
> to another member. This means that the time between subsequent calls to
> poll() was longer than the configured session.timeout.ms, which typically
> implies that the poll loop is spending too much time message processing. You
> can address this either by increasing the session timeout or by reducing the
> maximum size of batches returned in poll() with max.poll.records.
> 2) java.lang.IllegalStateException: Correlation id for response (1246203)
> does not match request (1246122)
> *bad results*: kafka cluster in sick
> *root cause*:
> 1) OldConsumer after 0.10.1 in ConsumerFetcherThread.scala setting
> requestVersion 3:
> {code:java}
> private val fetchRequestBuilder = new FetchRequestBuilder().
> clientId(clientId).
> replicaId(Request.OrdinaryConsumerId).
> maxWait(config.fetchWaitMaxMs).
> minBytes(config.fetchMinBytes).
> requestVersion(3) // for now, the old consumer is pinned to the old
> message format through the fetch request
> {code}
> but in 0.10.0.1 FetchRequest.CurrentVersion=2, FetchRequst.readFrom wouldn't
> read the field "max_bytes" from version 3 :
> {code:java}
> def readFrom(buffer: ByteBuffer): FetchRequest = {
> val versionId = buffer.getShort
> val correlationId = buffer.getInt
> val clientId = readShortString(buffer)
> val replicaId = buffer.getInt
> val maxWait = buffer.getInt
> val minBytes = buffer.getInt
> val topicCount = buffer.getInt
> val pairs = (1 to topicCount).flatMap(_ => {
> val topic = readShortString(buffer)
> val partitionCount = buffer.getInt
> (1 to partitionCount).map(_ => {
> val partitionId = buffer.getInt
> val offset = buffer.getLong
> val fetchSize = buffer.getInt
> (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset,
> fetchSize))
> })
> })
> FetchRequest(versionId, correlationId, clientId, replicaId, maxWait,
> minBytes, Map(pairs:_*))
> }
> {code}
> 2) when the FetchRequst.readFrom crashed with throwable like
> "BufferUnderflowException" not in "(InvalidRequestException,
> SchemaException)", the socket wouldn't be closed;
> SocketServer.processCompletedReceives:
> {code:java}
> private def processCompletedReceives() {
> selector.completedReceives.asScala.foreach { receive =>
> try {
> val openChannel = selector.channel(receive.source)
> // Only methods that are safe to call on a disconnected channel
> should be invoked on 'openOrClosingChannel'.
> val openOrClosingChannel = if (openChannel != null) openChannel else
> selector.closingChannel(receive.source)
> val session = RequestChannel.Session(new
> KafkaPrincipal(KafkaPrincipal.USER_TYPE,
> openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
> val req = RequestChannel.Request(processor = id, connectionId =
> receive.source, session = session,
> buffer = receive.payload, startTimeNanos = time.nanoseconds,
> listenerName = listenerName, securityProtocol = securityProtocol)
> requestChannel.sendRequest(req)
> selector.mute(receive.source)
> } catch {
> case e @ (_: InvalidRequestException | _: SchemaException) =>
> // note that even though we got an exception, we can assume that
> receive.source is valid. Issues with constructing a valid receive object were
> handled earlier
> error(s"Closing socket for ${receive.source} because of error", e)
> close(selector, receive.source)
> }
> }
> }
> {code}
> *workaround but not the optimal*:
> throw a known InvalidRequestException(or SchemaException more suitable) in
> RequestChannel.scala:
> {code:java}
> /*// TODO: this will be removed once we migrated to client-side format
> // for server-side request / response format
> // NOTE: this map only includes the server-side request/response
> handlers. Newer
> // request types should only use the client-side versions which are
> parsed with
> // o.a.k.common.requests.AbstractRequest.getRequest()
> private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) =>
> RequestOrResponse]=
> Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
> ApiKeys.CONTROLLED_SHUTDOWN_KEY.id ->
> ControlledShutdownRequest.readFrom
> )
> // TODO: this will be removed once we migrated to client-side format
> val requestObj =
> keyToNameAndDeserializerMap.get(requestId).map(readFrom =>
> readFrom(buffer)).orNull*/
> val requestObj: RequestOrResponse = requestId match {
> case ApiKeys.FETCH.id => getFetchRequest()
> case ApiKeys.CONTROLLED_SHUTDOWN_KEY.id =>
> ControlledShutdownRequest.readFrom(buffer)
> case _ => null
> }
> def getFetchRequest(): FetchRequest = {
> try{
> FetchRequest.readFrom(buffer)
> }catch {
> case ex: Throwable =>
> throw new InvalidRequestException(s"FetchRequest version for API
> key not match server's "+ requestId + ": " + FetchRequest.CurrentVersion, ex)
> }
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)