[ 
https://issues.apache.org/jira/browse/KAFKA-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dave Crighton updated KAFKA-15247:
----------------------------------
    Priority: Major  (was: Minor)

> OutOfMemoryError in SaslClientAuthenticator during server restart 
> ------------------------------------------------------------------
>
>                 Key: KAFKA-15247
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15247
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.5.1
>            Reporter: Dave Crighton
>            Priority: Major
>              Labels: patch
>         Attachments: 
> 0001-defensive-code-for-rogue-packets-during-server-resta.patch
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We embed the Kafka client  in IBM App Connect Enterprise in order to provide 
> Kafka consume and  produce functionality. This product is a little  bit like 
> an app server in that it may host multiple workloads including some which may 
> not use the Kafka functionality.
>  
> When the Kafka server is installed in an open shift environment we are seeing 
> cases where the clients receive OutOfMemory errors due to single large 
> (1.2Gb) byte buffers being allocated by the client.
>  
> From research this appears to be a known issue when a plaintext client is 
> configured to attempt connection to a TLS secured endpoint however in this 
> instance we see successful communication  via TLS and then when the Kafka 
> server is restarted (or connectivity is broken) both the consumers and 
> producers can throw OutOfMemoryError's with the following stacks:
>  
> Producer
> ------------
>  
> {{4XESTACKTRACE                at 
> java/nio/HeapByteBuffer.<init>(HeapByteBuffer.java:57(Compiled Code))}}
> {{4XESTACKTRACE                at 
> java/nio/ByteBuffer.allocate(ByteBuffer.java:335(Compiled Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/common/memory/MemoryPool$1.tryAllocate(MemoryPool.java:30(Compiled
>  Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:102(Compiled
>  Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:475(Compiled
>  Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:572(Compiled
>  Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:250(Compiled
>  Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/common/network/KafkaChannel.prepare(KafkaChannel.java:181(Compiled
>  Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:543(Compiled
>  Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/common/network/Selector.poll(Selector.java:481(Compiled 
> Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:571(Compiled 
> Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/clients/producer/internals/Sender.runOnce(Sender.java:328(Compiled
>  Code))}}
> {{4XESTACKTRACE                at 
> org/apache/kafka/clients/producer/internals/Sender.run(Sender.java:243(Compiled
>  Code))}}
> {{4XESTACKTRACE                at java/lang/Thread.run(Thread.java:830)}}
>  
> Consumer
> -------------
> {{{{3XMTHREADINFO3           Java callstack:
> 4XESTACKTRACE                at 
> java/nio/HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> 4XESTACKTRACE                at 
> java/nio/ByteBuffer.allocate(ByteBuffer.java:335)
> 4XESTACKTRACE                at 
> org/apache/kafka/common/memory/MemoryPool$1.tryAllocate(MemoryPool.java:30)
> 4XESTACKTRACE                at 
> org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:113)
> 4XESTACKTRACE                at 
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:475)
> 4XESTACKTRACE                at 
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:572)
> 4XESTACKTRACE                at 
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:250)
> 4XESTACKTRACE                at 
> org/apache/kafka/common/network/KafkaChannel.prepare(KafkaChannel.java:181)
> 4XESTACKTRACE                at 
> org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:543)
> 4XESTACKTRACE                at 
> org/apache/kafka/common/network/Selector.poll(Selector.java:481)
> 4XESTACKTRACE                at 
> org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:551)
> 4XESTACKTRACE                at 
> org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
> 4XESTACKTRACE                at 
> org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> 4XESTACKTRACE                at 
> org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
> 4XESTACKTRACE                at 
> org/apache/kafka/clients/consumer/internals/Fetcher.getTopicMetadata(Fetcher.java:374)
> 4XESTACKTRACE                at 
> org/apache/kafka/clients/consumer/KafkaConsumer.partitionsFor(KafkaConsumer.java:1949)
> 4XESTACKTRACE                at 
> org/apache/kafka/clients/consumer/KafkaConsumer.partitionsFor(KafkaConsumer.java:1917)
> 4XESTACKTRACE                at 
> com/ibm/broker/connector/kafka/KafkaIIBConsumer.initialise(KafkaIIBConsumer.java:177)
> 4XESTACKTRACE                at 
> com/ibm/broker/connector/kafka/KafkaIIBConsumer.start(KafkaIIBConsumer.java:512)
> 5XESTACKTRACE                   (entered lock: 
> com/ibm/broker/connector/kafka/KafkaIIBConsumer@0x00000000C0A94038, entry 
> count: 1)
> 4XESTACKTRACE                at 
> com/ibm/broker/connector/kafka/KafkaInputConnector.start(KafkaInputConnector.java:250)}}}}
>  
> We believe that what is happening is that when the Kafka server goes down, in 
> the RHOS environment the route is still available for some small period of 
> time and the SASLClientAuthenticator is able to receive rogue packets which 
> it interprets as a length to read off stream. 
>  
> For the consumer code since there is application code on the stack we were 
> able to implement a workaround by catching the OOM but on the producer side 
> the entire stack is Kafka client code.
>  
> I looked at the SaslClientAuthenticator code and I can see that it's use of 
> the network buffer is unbounded so I applied 2 patches to this code. The 
> first limits the buffer size for authentication to 10Mb, the 2nd catches the 
> OOM and instead fails auth.
>  
> Using the patched client the customer has gone from being able to recreate 
> this on at least 1 appserver for every Kafka server restart to not being able 
> to reproduce the issue at all.
>  
> I am happy to submit a PR but I wanted to get feedback before I did so. For 
> instance is 10Mb a suitable maximum buffer size for auth, should the maximum 
> perhaps be configurable instead and if so what is best practice for providing 
> this configuration>
>  
> Secondly catching the OOM doesn't feel like best practice to me however 
> without doing this the entire application fails due to aggressive allocation 
> of byte buffers in the SaslClientAuthenticator is there any alternative I 
> should be considering.
>  
> I realise that this issue has been raised before and in the case of a 
> mis-configuration it looks like this was not considered a bug however in this 
> instance, at least in the customers environment, the configuration is 
> actually ok and the error is causing clients to be unable to tolerate a 
> server failure. 
>  
> I appreciate any guidance you might give me on how I can get a change 
> committed to prevent the problem.
>  
> {{{{}}}}
>  
> {{{{}}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to