Hi!

We're having a Kafka 3.4.1 cluster in use which we access through a Scala 
Library "zio-Kafka".

This lib itself uses the Java Client in Version 3.6.1

We have to authenticate with the broker through Kerberos/SAML.

It appears that the broker regularly queries LDAP to see if the token is still 
valid.

Sometimes this call is taking too long, and in our client we see this log 
message:

[Producer clientId=producer-1] Connection to node 1546333927 
(xxx.acme.corp/10.123.45.181:6668) terminated during authentication. This may 
happen due to any of the following reasons: (1) Authentication failed due to 
invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka 
TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.

The broker logs this

Potential performance problem: getGroups(user=yyacme_abc_krb) took 36492 
milliseconds.

I'm tasked with making our code robust against this kind of failure (including 
writing a test)

and thus have to see if I need to adjust our code or the zio-Kafka lib.

For creating a test reproducing the issue , I need to create a KafkaConsumer 
with a customized KafkaConsumerClient & KafkaClient/NetworkClient.

In the 3.6.1 codebase, the problem is now the usage of static imports which 
prevent me from overriding how clients are created, e.g.

https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L714

Looks like one could override how to create the consumerNetworkClient, but that 
is not possible, as "createConsumerNetworkClient" is not a method in 
KafkaConsumer but a statically imported method from ConsumerUtils.

Also all fields in KafkaConsumer are marked private, which prevent me from 
using them for just creating a custom KafkaConsumer using the testing 
constructor

https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L786

So KafkaConsumer#L714 calls into a static method in the helper class 
ConsumerUtils which itself again calls a static method in the ClientUtils 
class. This is making it very hard/seemingly impossible to swap out the 
ConsumerNetworkClient with a mock/stub.

I've seen that the general structure of KafkaConsumer has changed significantly 
in trunk,

with having a ConsumerDelegate as well as a ConsumerDelegateCreator.

However, it looks like the usage of static helper methods was just moved to 
these classes, making it now even harder to customize a KafkaConsumer.

Any pointers on how I can create a KafkaConsumer with a custom NetworkClient 
(where I can

control/simulate the above mentioned auth issue - after initial authentication 
worked) would be highly appreciated!

My next approach would be using reflection to get access to the fields - I 
would like to avoid that if somehow possible.

Thanks a lot!

Dominik

Dominik Dorn
https://dominikdorn.com/ | https://twitter.com/domdorn
XING: https://www.xing.com/profile/Dominik_Dorn
LINKEDIN: https://www.linkedin.com/in/dominik-dorn/

Reply via email to