junrao commented on code in PR #21019:
URL: https://github.com/apache/kafka/pull/21019#discussion_r2615794980


##########
clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java:
##########
@@ -455,695 +461,29 @@ public String documentation() {
         }
     };
 
-    public static final DocumentedType STRING = new DocumentedType() {
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            byte[] bytes = Utils.utf8((String) o);
-            if (bytes.length > Short.MAX_VALUE)
-                throw new SchemaException("String length " + bytes.length + " 
is larger than the maximum string length.");
-            buffer.putShort((short) bytes.length);
-            buffer.put(bytes);
-        }
-
-        @Override
-        public String read(ByteBuffer buffer) {
-            short length = buffer.getShort();
-            if (length < 0)
-                throw new SchemaException("String length " + length + " cannot 
be negative");
-            if (length > buffer.remaining())
-                throw new SchemaException("Error reading string of length " + 
length + ", only " + buffer.remaining() + " bytes available");
-            String result = Utils.utf8(buffer, length);
-            buffer.position(buffer.position() + length);
-            return result;
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            return 2 + Utils.utf8Length((String) o);
-        }
-
-        @Override
-        public String typeName() {
-            return "STRING";
-        }
-
-        @Override
-        public String validate(Object item) {
-            if (item instanceof String)
-                return (String) item;
-            else
-                throw new SchemaException(item + " is not a String.");
-        }
-
-        @Override
-        public String documentation() {
-            return "Represents a sequence of characters. First the length N is 
given as an " + INT16 +
-                    ". Then N bytes follow which are the UTF-8 encoding of the 
character sequence. " +
-                    "Length must not be negative.";
-        }
-    };
-
-    public static final DocumentedType COMPACT_STRING = new DocumentedType() {
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            byte[] bytes = Utils.utf8((String) o);
-            if (bytes.length > Short.MAX_VALUE)
-                throw new SchemaException("String length " + bytes.length + " 
is larger than the maximum string length.");
-            ByteUtils.writeUnsignedVarint(bytes.length + 1, buffer);
-            buffer.put(bytes);
-        }
-
-        @Override
-        public String read(ByteBuffer buffer) {
-            int length = ByteUtils.readUnsignedVarint(buffer) - 1;
-            if (length < 0)
-                throw new SchemaException("String length " + length + " cannot 
be negative");
-            if (length > Short.MAX_VALUE)
-                throw new SchemaException("String length " + length + " is 
larger than the maximum string length.");
-            if (length > buffer.remaining())
-                throw new SchemaException("Error reading string of length " + 
length + ", only " + buffer.remaining() + " bytes available");
-            String result = Utils.utf8(buffer, length);
-            buffer.position(buffer.position() + length);
-            return result;
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            int length = Utils.utf8Length((String) o);
-            return ByteUtils.sizeOfUnsignedVarint(length + 1) + length;
-        }
-
-        @Override
-        public String typeName() {
-            return "COMPACT_STRING";
-        }
-
-        @Override
-        public String validate(Object item) {
-            if (item instanceof String)
-                return (String) item;
-            else
-                throw new SchemaException(item + " is not a String.");
-        }
-
-        @Override
-        public String documentation() {
-            return "Represents a sequence of characters. First the length N + 
1 is given as an UNSIGNED_VARINT " +
-                    ". Then N bytes follow which are the UTF-8 encoding of the 
character sequence.";
-        }
-    };
-
-    public static final DocumentedType NULLABLE_STRING = new DocumentedType() {
-        @Override
-        public boolean isNullable() {
-            return true;
-        }
-
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            if (o == null) {
-                buffer.putShort((short) -1);
-                return;
-            }
-
-            byte[] bytes = Utils.utf8((String) o);
-            if (bytes.length > Short.MAX_VALUE)
-                throw new SchemaException("String length " + bytes.length + " 
is larger than the maximum string length.");
-            buffer.putShort((short) bytes.length);
-            buffer.put(bytes);
-        }
-
-        @Override
-        public String read(ByteBuffer buffer) {
-            short length = buffer.getShort();
-            if (length < 0)
-                return null;
-            if (length > buffer.remaining())
-                throw new SchemaException("Error reading string of length " + 
length + ", only " + buffer.remaining() + " bytes available");
-            String result = Utils.utf8(buffer, length);
-            buffer.position(buffer.position() + length);
-            return result;
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            if (o == null)
-                return 2;
-
-            return 2 + Utils.utf8Length((String) o);
-        }
-
-        @Override
-        public String typeName() {
-            return "NULLABLE_STRING";
-        }
-
-        @Override
-        public String validate(Object item) {
-            if (item == null)
-                return null;
-
-            if (item instanceof String)
-                return (String) item;
-            else
-                throw new SchemaException(item + " is not a String.");
-        }
-
-        @Override
-        public String documentation() {
-            return "Represents a sequence of characters or null. For non-null 
strings, first the length N is given as an " + INT16 +
-                    ". Then N bytes follow which are the UTF-8 encoding of the 
character sequence. " +
-                    "A null value is encoded with length of -1 and there are 
no following bytes.";
-        }
-    };
-
-    public static final DocumentedType COMPACT_NULLABLE_STRING = new 
DocumentedType() {
-        @Override
-        public boolean isNullable() {
-            return true;
-        }
-
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            if (o == null) {
-                ByteUtils.writeUnsignedVarint(0, buffer);
-            } else {
-                byte[] bytes = Utils.utf8((String) o);
-                if (bytes.length > Short.MAX_VALUE)
-                    throw new SchemaException("String length " + bytes.length 
+ " is larger than the maximum string length.");
-                ByteUtils.writeUnsignedVarint(bytes.length + 1, buffer);
-                buffer.put(bytes);
-            }
-        }
-
-        @Override
-        public String read(ByteBuffer buffer) {
-            int length = ByteUtils.readUnsignedVarint(buffer) - 1;
-            if (length < 0) {
-                return null;
-            } else if (length > Short.MAX_VALUE) {
-                throw new SchemaException("String length " + length + " is 
larger than the maximum string length.");
-            } else if (length > buffer.remaining()) {
-                throw new SchemaException("Error reading string of length " + 
length + ", only " + buffer.remaining() + " bytes available");
-            } else {
-                String result = Utils.utf8(buffer, length);
-                buffer.position(buffer.position() + length);
-                return result;
-            }
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            if (o == null) {
-                return 1;
-            }
-            int length = Utils.utf8Length((String) o);
-            return ByteUtils.sizeOfUnsignedVarint(length + 1) + length;
-        }
-
-        @Override
-        public String typeName() {
-            return "COMPACT_NULLABLE_STRING";
-        }
-
-        @Override
-        public String validate(Object item) {
-            if (item == null) {
-                return null;
-            } else if (item instanceof String) {
-                return (String) item;
-            } else {
-                throw new SchemaException(item + " is not a String.");
-            }
-        }
-
-        @Override
-        public String documentation() {
-            return "Represents a sequence of characters. First the length N + 
1 is given as an UNSIGNED_VARINT " +
-                    ". Then N bytes follow which are the UTF-8 encoding of the 
character sequence. " +
-                    "A null string is represented with a length of 0.";
-        }
-    };
-
-    public static final DocumentedType BYTES = new DocumentedType() {
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            ByteBuffer arg = (ByteBuffer) o;
-            int pos = arg.position();
-            buffer.putInt(arg.remaining());
-            buffer.put(arg);
-            arg.position(pos);
-        }
-
-        @Override
-        public Object read(ByteBuffer buffer) {
-            int size = buffer.getInt();
-            if (size < 0)
-                throw new SchemaException("Bytes size " + size + " cannot be 
negative");
-            if (size > buffer.remaining())
-                throw new SchemaException("Error reading bytes of size " + 
size + ", only " + buffer.remaining() + " bytes available");
-
-            int limit = buffer.limit();
-            int newPosition = buffer.position() + size;
-            buffer.limit(newPosition);
-            ByteBuffer val = buffer.slice();
-            buffer.limit(limit);
-            buffer.position(newPosition);
-            return val;
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            ByteBuffer buffer = (ByteBuffer) o;
-            return 4 + buffer.remaining();
-        }
-
-        @Override
-        public String typeName() {
-            return "BYTES";
-        }
+    public static final DocumentedType STRING = new StringType();
 
-        @Override
-        public ByteBuffer validate(Object item) {
-            if (item instanceof ByteBuffer)
-                return (ByteBuffer) item;
-            else
-                throw new SchemaException(item + " is not a 
java.nio.ByteBuffer.");
-        }
+    public static final DocumentedType COMPACT_STRING = new CompactString();
 
-        @Override
-        public String documentation() {
-            return "Represents a raw sequence of bytes. First the length N is 
given as an " + INT32 +
-                    ". Then N bytes follow.";
-        }
-    };
+    public static final DocumentedType NULLABLE_STRING = new NullableString();
 
-    public static final DocumentedType COMPACT_BYTES = new DocumentedType() {
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            ByteBuffer arg = (ByteBuffer) o;
-            int pos = arg.position();
-            ByteUtils.writeUnsignedVarint(arg.remaining() + 1, buffer);
-            buffer.put(arg);
-            arg.position(pos);
-        }
+    public static final DocumentedType COMPACT_NULLABLE_STRING = new 
CompactNullableString();
 
-        @Override
-        public Object read(ByteBuffer buffer) {
-            int size = ByteUtils.readUnsignedVarint(buffer) - 1;
-            if (size < 0)
-                throw new SchemaException("Bytes size " + size + " cannot be 
negative");
-            if (size > buffer.remaining())
-                throw new SchemaException("Error reading bytes of size " + 
size + ", only " + buffer.remaining() + " bytes available");
+    public static final DocumentedType BYTES = new Bytes();
 
-            int limit = buffer.limit();
-            int newPosition = buffer.position() + size;
-            buffer.limit(newPosition);
-            ByteBuffer val = buffer.slice();
-            buffer.limit(limit);
-            buffer.position(newPosition);
-            return val;
-        }
+    public static final DocumentedType COMPACT_BYTES = new CompactBytes();
 
-        @Override
-        public int sizeOf(Object o) {
-            ByteBuffer buffer = (ByteBuffer) o;
-            int remaining = buffer.remaining();
-            return ByteUtils.sizeOfUnsignedVarint(remaining + 1) + remaining;
-        }
+    public static final DocumentedType NULLABLE_BYTES = new NullableBytes();
 
-        @Override
-        public String typeName() {
-            return "COMPACT_BYTES";
-        }
+    public static final DocumentedType COMPACT_NULLABLE_BYTES = new 
CompactNullableBytes();
 
-        @Override
-        public ByteBuffer validate(Object item) {
-            if (item instanceof ByteBuffer)
-                return (ByteBuffer) item;
-            else
-                throw new SchemaException(item + " is not a 
java.nio.ByteBuffer.");
-        }
+    public static final DocumentedType RECORDS = new Records();
 
-        @Override
-        public String documentation() {
-            return "Represents a raw sequence of bytes. First the length N+1 
is given as an UNSIGNED_VARINT." +
-                    " Then N bytes follow.";
-        }
-    };
+    public static final DocumentedType COMPACT_RECORDS = new CompactRecords();
 
-    public static final DocumentedType NULLABLE_BYTES = new DocumentedType() {
-        @Override
-        public boolean isNullable() {
-            return true;
-        }
+    public static final DocumentedType NULLABLE_RECORDS = new 
NullableRecords();

Review Comment:
   For those types created as anonymous class, it's probably better to just 
keep them as they are. To avoid code duplication, we could redirect the calls 
in the nullable version to the non-nullable one. For example, in 
NULLABLE_RECORDS.write(), we can call RECORDS.write(buffer, o).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to