[ https://issues.apache.org/jira/browse/KAFKA-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dave Crighton updated KAFKA-15247: ---------------------------------- Priority: Major (was: Minor) > OutOfMemoryError in SaslClientAuthenticator during server restart > ------------------------------------------------------------------ > > Key: KAFKA-15247 > URL: https://issues.apache.org/jira/browse/KAFKA-15247 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 3.5.1 > Reporter: Dave Crighton > Priority: Major > Labels: patch > Attachments: > 0001-defensive-code-for-rogue-packets-during-server-resta.patch > > Original Estimate: 24h > Remaining Estimate: 24h > > We embed the Kafka client in IBM App Connect Enterprise in order to provide > Kafka consume and produce functionality. This product is a little bit like > an app server in that it may host multiple workloads including some which may > not use the Kafka functionality. > > When the Kafka server is installed in an open shift environment we are seeing > cases where the clients receive OutOfMemory errors due to single large > (1.2Gb) byte buffers being allocated by the client. > > From research this appears to be a known issue when a plaintext client is > configured to attempt connection to a TLS secured endpoint however in this > instance we see successful communication via TLS and then when the Kafka > server is restarted (or connectivity is broken) both the consumers and > producers can throw OutOfMemoryError's with the following stacks: > > Producer > ------------ > > {{4XESTACKTRACE at > java/nio/HeapByteBuffer.<init>(HeapByteBuffer.java:57(Compiled Code))}} > {{4XESTACKTRACE at > java/nio/ByteBuffer.allocate(ByteBuffer.java:335(Compiled Code))}} > {{4XESTACKTRACE at > org/apache/kafka/common/memory/MemoryPool$1.tryAllocate(MemoryPool.java:30(Compiled > Code))}} > {{4XESTACKTRACE at > org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:102(Compiled > Code))}} > {{4XESTACKTRACE at > org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:475(Compiled > Code))}} > {{4XESTACKTRACE at > org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:572(Compiled > Code))}} > {{4XESTACKTRACE at > org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:250(Compiled > Code))}} > {{4XESTACKTRACE at > org/apache/kafka/common/network/KafkaChannel.prepare(KafkaChannel.java:181(Compiled > Code))}} > {{4XESTACKTRACE at > org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:543(Compiled > Code))}} > {{4XESTACKTRACE at > org/apache/kafka/common/network/Selector.poll(Selector.java:481(Compiled > Code))}} > {{4XESTACKTRACE at > org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:571(Compiled > Code))}} > {{4XESTACKTRACE at > org/apache/kafka/clients/producer/internals/Sender.runOnce(Sender.java:328(Compiled > Code))}} > {{4XESTACKTRACE at > org/apache/kafka/clients/producer/internals/Sender.run(Sender.java:243(Compiled > Code))}} > {{4XESTACKTRACE at java/lang/Thread.run(Thread.java:830)}} > > Consumer > ------------- > {{{{3XMTHREADINFO3 Java callstack: > 4XESTACKTRACE at > java/nio/HeapByteBuffer.<init>(HeapByteBuffer.java:57) > 4XESTACKTRACE at > java/nio/ByteBuffer.allocate(ByteBuffer.java:335) > 4XESTACKTRACE at > org/apache/kafka/common/memory/MemoryPool$1.tryAllocate(MemoryPool.java:30) > 4XESTACKTRACE at > org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:113) > 4XESTACKTRACE at > org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:475) > 4XESTACKTRACE at > org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:572) > 4XESTACKTRACE at > org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:250) > 4XESTACKTRACE at > org/apache/kafka/common/network/KafkaChannel.prepare(KafkaChannel.java:181) > 4XESTACKTRACE at > org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:543) > 4XESTACKTRACE at > org/apache/kafka/common/network/Selector.poll(Selector.java:481) > 4XESTACKTRACE at > org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:551) > 4XESTACKTRACE at > org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) > 4XESTACKTRACE at > org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > 4XESTACKTRACE at > org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > 4XESTACKTRACE at > org/apache/kafka/clients/consumer/internals/Fetcher.getTopicMetadata(Fetcher.java:374) > 4XESTACKTRACE at > org/apache/kafka/clients/consumer/KafkaConsumer.partitionsFor(KafkaConsumer.java:1949) > 4XESTACKTRACE at > org/apache/kafka/clients/consumer/KafkaConsumer.partitionsFor(KafkaConsumer.java:1917) > 4XESTACKTRACE at > com/ibm/broker/connector/kafka/KafkaIIBConsumer.initialise(KafkaIIBConsumer.java:177) > 4XESTACKTRACE at > com/ibm/broker/connector/kafka/KafkaIIBConsumer.start(KafkaIIBConsumer.java:512) > 5XESTACKTRACE (entered lock: > com/ibm/broker/connector/kafka/KafkaIIBConsumer@0x00000000C0A94038, entry > count: 1) > 4XESTACKTRACE at > com/ibm/broker/connector/kafka/KafkaInputConnector.start(KafkaInputConnector.java:250)}}}} > > We believe that what is happening is that when the Kafka server goes down, in > the RHOS environment the route is still available for some small period of > time and the SASLClientAuthenticator is able to receive rogue packets which > it interprets as a length to read off stream. > > For the consumer code since there is application code on the stack we were > able to implement a workaround by catching the OOM but on the producer side > the entire stack is Kafka client code. > > I looked at the SaslClientAuthenticator code and I can see that it's use of > the network buffer is unbounded so I applied 2 patches to this code. The > first limits the buffer size for authentication to 10Mb, the 2nd catches the > OOM and instead fails auth. > > Using the patched client the customer has gone from being able to recreate > this on at least 1 appserver for every Kafka server restart to not being able > to reproduce the issue at all. > > I am happy to submit a PR but I wanted to get feedback before I did so. For > instance is 10Mb a suitable maximum buffer size for auth, should the maximum > perhaps be configurable instead and if so what is best practice for providing > this configuration> > > Secondly catching the OOM doesn't feel like best practice to me however > without doing this the entire application fails due to aggressive allocation > of byte buffers in the SaslClientAuthenticator is there any alternative I > should be considering. > > I realise that this issue has been raised before and in the case of a > mis-configuration it looks like this was not considered a bug however in this > instance, at least in the customers environment, the configuration is > actually ok and the error is causing clients to be unable to tolerate a > server failure. > > I appreciate any guidance you might give me on how I can get a change > committed to prevent the problem. > > {{{{}}}} > > {{{{}}}} -- This message was sent by Atlassian Jira (v8.20.10#820010)