Hi Kafka Developers,

I'd like to discuss something I've noticed about the generated
serialization code of the Kafka Protocol
<https://kafka.apache.org/protocol.html>.

I'm attempting to create a topic using the most recent KafkaAdminClient
implementation on maven
<https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/4.1.0>.
The CREATE_TOPIC
<https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/CreateTopicsRequest.json>
RPC specifies that configuration values may be COMPACT_STRING

CreateTopics Request (Version: 7) => [topics] timeout_ms validate_only
_tagged_fields
  topics => name num_partitions replication_factor [assignments] [configs]
_tagged_fields
    name => COMPACT_STRING
    ...
    configs => name value _tagged_fields
      name => COMPACT_STRING
      value => COMPACT_NULLABLE_STRING
    ...

COMPACT_STRING is defined as follows:

> Represents a sequence of characters. First the length N + 1 is given as
an UNSIGNED_VARINT . Then N bytes follow which are the UTF-8 encoding of
the character sequence.

I'm not sure whether a maximum size has been specified for UNSIGNED_VARINT.
So I assumed that these strings can have an arbitrary size.

When I generate a CREATE_TOPIC request in the KafkaAdminClient:

String longValue = "x".repeat(524_288);
Map<String, String> newTopicConfig = new HashMap<>();
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, longValue);

And send the request - I end up with the following exception:

java.lang.RuntimeException: 'value' field is too long to be serialized
at
org.apache.kafka.common.message.CreateTopicsRequestData$CreatableTopicConfig.addSize(CreateTopicsRequestData.java:1219)
at
org.apache.kafka.common.message.CreateTopicsRequestData$CreatableTopic.addSize(CreateTopicsRequestData.java:576)
at
org.apache.kafka.common.message.CreateTopicsRequestData.addSize(CreateTopicsRequestData.java:207)
at
org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
at
org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
at
org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:110)
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:608)
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:582)

It seems that the generated code of CreateTopicRequestData contains:

byte[] _stringBytes = value.getBytes(StandardCharsets.UTF_8);
if (_stringBytes.length > 0x7fff) {
  throw new RuntimeException("'value' field is too long to be serialized");
}

This code sample is generated by this function
<https://github.com/apache/kafka/blob/409a43eff77511e89bba2f95934cb1ebc417236d/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java#L1117>
and
is what causes the exception to occur.

Is it intended that RPCs using COMPACT_STRING should have this size limit?

Thanks!

Reply via email to