awelless commented on code in PR #10105: URL: https://github.com/apache/nifi/pull/10105#discussion_r2219269407
########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchemaParser.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schema; + +import java.util.List; + +public interface ProtobufMessageSchemaParser { Review Comment: Nit: let's add docs for this interface. ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaDefinition.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import org.apache.nifi.serialization.record.SchemaIdentifier; + +import java.util.Map; + +/** + * This acts as a data container for schema information retrieved from various schema registries. + */ +public interface SchemaDefinition { + + enum SchemaType { + AVRO, + PROTOBUF, + JSON Review Comment: Specifying all 3 formats can be misleading for other people, and they may assume all 3 formats are supported. Shall we limit ourselves to Protobuf only, while keeping the room for other formats? ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchema.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schema; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Basic abstraction over Protobuf schema message entity. + * It contains only the fields that are needed for crude schema insights. It's useful when the + * message name resolver needs to pinpoint specific messages by message indexes encoded on the wire + * <p> + * https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + * <p> + * It contains bare minimum of information for name resolver to be able to resolve message names + */ +public final class ProtobufMessageSchema { + + private final String name; + private final Optional<String> packageName; + private final List<ProtobufMessageSchema> childMessageSchemas = new ArrayList<>(); + + public ProtobufMessageSchema(final String name, final Optional<String> packageName) { + this.name = name; + this.packageName = packageName; + } + + public String name() { + return name; + } + + public Optional<String> packageName() { + return packageName; + } + + public boolean isDefaultPackage() { + return packageName.isEmpty(); + } + + public List<ProtobufMessageSchema> getChildMessageSchemas() { + return childMessageSchemas; + } + + void addChildMessage(final ProtobufMessageSchema protobufMessageSchema) { + childMessageSchemas.add(protobufMessageSchema); + } Review Comment: Nit: will it make sense to make `ProtobufMessageSchema` an immutable record and perform the list construction in a builder? ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-api/src/main/java/org/apache/nifi/confluent/schema/ProtobufMessageSchemaParser.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schema; + +import java.util.List; + +public interface ProtobufMessageSchemaParser { Review Comment: This parser will be required for AWS Glue schema registry too. We can either create a separate module which will be shared between Confluent and AWS Glue; or we can keep it in the `confluent` package for now and move it outside when the need arises. This comment applies to everything in `nifi-confluent-platform-api` and `nifi-confluent-protobuf-antlr-parser` modules. ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardSchemaDefinition.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import org.apache.nifi.serialization.record.SchemaIdentifier; + +import java.util.Map; +import java.util.Objects; + +/** + * Simple implementation of SchemaDefinition that holds a schema identifier, text, and references. + */ +public class StandardSchemaDefinition implements SchemaDefinition { Review Comment: Same as for `StandardMessageName` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-antlr-parser/src/main/java/org/apache/nifi/confluent/schema/AntlrProtobufMessageSchemaParser.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schema; + +import org.antlr.v4.runtime.BaseErrorListener; +import org.antlr.v4.runtime.CharStream; +import org.antlr.v4.runtime.CharStreams; +import org.antlr.v4.runtime.CommonTokenStream; +import org.antlr.v4.runtime.RecognitionException; +import org.antlr.v4.runtime.Recognizer; +import org.apache.nifi.confluent.schema.antlr.Protobuf3BaseVisitor; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Lexer; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser.MessageDefContext; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser.PackageStatementContext; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser.ProtoContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Stack; + +/** + * Implementation of ProtobufMessageSchemaParser that uses ANTLR to parse protobuf schemas + * and extract message definitions including nested messages. + */ +public class AntlrProtobufMessageSchemaParser implements ProtobufMessageSchemaParser { + + @Override + public List<ProtobufMessageSchema> parse(final String schemaText) { + + final CharStream input = CharStreams.fromString(schemaText); + final Protobuf3Lexer lexer = new Protobuf3Lexer(input); + final Protobuf3Parser parser = getProtobuf3Parser(lexer); + + final ProtoContext tree = parser.proto(); + + // Create visitor and analyze + final SchemaVisitor visitor = new SchemaVisitor(); + visitor.visit(tree); + + return visitor.getProtoMessages(); + + } + + private Protobuf3Parser getProtobuf3Parser(final Protobuf3Lexer lexer) { + final CommonTokenStream tokens = new CommonTokenStream(lexer); + final Protobuf3Parser parser = new Protobuf3Parser(tokens); + + // Add error listener to capture parsing errors + parser.removeErrorListeners(); + parser.addErrorListener(new BaseErrorListener() { + @Override + public void syntaxError(final Recognizer<?, ?> recognizer, final Object offendingSymbol, final int line, final int charPositionInLine, final String msg, final RecognitionException e) { + throw new RuntimeException(String.format("Syntax error at line %d, position %d: %s", line, charPositionInLine, msg), e); + } + }); + return parser; + } + + private static class SchemaVisitor extends Protobuf3BaseVisitor<Void> { + + private final List<ProtobufMessageSchema> rootMessages = new ArrayList<>(); + private final Stack<ProtobufMessageSchema> messageStack = new Stack<>(); + private String currentPackage; + + @Override + public Void visitPackageStatement(final PackageStatementContext ctx) { + if (ctx.fullIdent() != null) { + currentPackage = ctx.fullIdent().getText(); + } + return null; + } + + @Override + public Void visitMessageDef(final MessageDefContext ctx) { + final String messageName = ctx.messageName().getText(); + + final ProtobufMessageSchema protobufMessageSchema = new ProtobufMessageSchema(messageName, Optional.ofNullable(currentPackage)); + + // Add to parent's nested messages or root messages + if (messageStack.isEmpty()) { + rootMessages.add(protobufMessageSchema); + } else { + final ProtobufMessageSchema parent = messageStack.peek(); + parent.addChildMessage(protobufMessageSchema); + } + + messageStack.push(protobufMessageSchema); + // Visit nested messages + super.visitMessageDef(ctx); + messageStack.pop(); + + return null; + } + + public List<ProtobufMessageSchema> getProtoMessages() { + return rootMessages.stream().toList(); Review Comment: Nit: `List.copyOf()` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java: ########## @@ -211,6 +221,91 @@ public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotF return createRecordSchema(completeSchema); } + @Override + public SchemaDefinition getSchemaDefinition(SchemaIdentifier identifier) throws SchemaNotFoundException { + JsonNode schemaJson; + String subject = null; + Integer version = null; + + // If we have an ID, get the schema by ID first + // Using schemaVersionId, because that is what is set by ConfluentEncodedSchemaReferenceReader. + // probably identifier field should be used, but I'm not changing ConfluentEncodedSchemaReferenceReader for backward compatibility reasons. + if (identifier.getSchemaVersionId().isPresent()) { + long schemaId = identifier.getSchemaVersionId().getAsLong(); + String schemaPath = getSchemaPath(schemaId); + schemaJson = fetchJsonResponse(schemaPath, "id " + schemaId); + } else if (identifier.getName().isPresent()) { + // If we have a name or (name and version), get the schema by those + subject = identifier.getName().get(); + version = identifier.getVersion().isPresent() ? identifier.getVersion().getAsInt() : null; + // if no version was specified, the latest version will be used. See @getSubjectPath method. + String pathSuffix = getSubjectPath(subject, version); + schemaJson = fetchJsonResponse(pathSuffix, "name " + subject); + } else { + throw new SchemaNotFoundException("Schema identifier must contain either a version identifier or a subject name"); + } + + // Extract schema information + String schemaText = schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText(); + String schemaTypeText = schemaJson.get(SCHEMA_TYPE_FIELD_NAME).asText(); + SchemaType schemaType = toSchemaType(schemaTypeText); + + long schemaId; + if (schemaJson.has(ID_FIELD_NAME)) { + schemaId = schemaJson.get(ID_FIELD_NAME).asLong(); + } else { + schemaId = identifier.getSchemaVersionId().getAsLong(); + } + + if (subject == null && schemaJson.has(SUBJECT_FIELD_NAME)) { + subject = schemaJson.get(SUBJECT_FIELD_NAME).asText(); + } + + if (version == null && schemaJson.has(VERSION_FIELD_NAME)) { + version = schemaJson.get(VERSION_FIELD_NAME).asInt(); + } + + // Build schema identifier with all available information + SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder() + .id(schemaId) + .name(subject) + .version(version) + .build(); + + // Process references if present + Map<String, SchemaDefinition> references = new HashMap<>(); + if (schemaJson.has(REFERENCES_FIELD_NAME) && !schemaJson.get(REFERENCES_FIELD_NAME).isNull()) { + ArrayNode refsArray = (ArrayNode) schemaJson.get(REFERENCES_FIELD_NAME); Review Comment: Let's verify that `schemaJson.get(REFERENCES_FIELD_NAME).isArray()` first. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentProtobufMessageNameResolver.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.confluent.schema.AntlrProtobufMessageSchemaParser; +import org.apache.nifi.confluent.schema.ProtobufMessageSchema; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schemaregistry.services.MessageName; +import org.apache.nifi.schemaregistry.services.MessageNameResolver; +import org.apache.nifi.schemaregistry.services.SchemaDefinition; +import org.apache.nifi.schemaregistry.services.StandardMessageName; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.Collections.singletonList; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; + +@Tags({"confluent", "schema", "registry", "protobuf", "message", "name", "resolver"}) +@CapabilityDescription(""" + Resolves Protobuf message names from Confluent Schema Registry wire format by decoding message indexes and looking up the fully qualified name in the schema definition + For Confluent wire format reference see: <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Confluent Wire Format</a> + """) +public class ConfluentProtobufMessageNameResolver extends AbstractControllerService implements MessageNameResolver { + + public static final int MAXIMUM_SUPPORTED_ARRAY_LENGTH = 100; + private Cache<FindMessageNameArguments, MessageName> messageNameCache; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + messageNameCache = Caffeine.newBuilder().maximumSize(1000).expireAfterWrite(Duration.ofHours(1)).build(); + } + + @OnDisabled + public void onDisabled(final ConfigurationContext context) { + if (messageNameCache != null) { + messageNameCache.invalidateAll(); Review Comment: Nit: we can `messageNameCache = null` to drop any auxiliary data structures used in the cache. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/test/java/org/apache/nifi/confluent/schemaregistry/VarintUtilsTest.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.writeZigZagVarint; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class VarintUtilsTest { + + @Test + public void testReadVarintFromStream_SingleByte() throws IOException { + byte[] data = {0x08}; // 8 in varint format (0x08 = 00001000) + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(8, result); + } + + @Test + public void testReadVarintFromStream_MultiByte() throws IOException { + byte[] data = {(byte) 0x96, 0x01}; // 150 in varint format (10010110 00000001) + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(150, result); + } + + @Test + public void testReadVarintFromStream_MaxValue() throws IOException { + // Maximum 32-bit value in varint format (11111111 11111111 11111111 11111111 00001111) + byte[] data = {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0x0F}; + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(0xFFFFFFFF, result); + } + + @Test + public void testReadVarintFromStream_WithFirstByte() throws IOException { + byte[] data = {0x08}; // 8 in varint format (0x08 = 00001000) + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStreamAfterFirstByteConsumed(inputStream, 0x08); + assertEquals(8, result); + } + + @Test + public void testReadVarintFromStream_Zero() throws IOException { + byte[] data = {0x00}; // 0 in varint format (0x00 = 00000000) + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(0, result); + } + + @Test + public void testReadVarintFromStream_LargeValue() throws IOException { + // 16384 in varint format (10000000 10000000 00000001) + byte[] data = {(byte) 0x80, (byte) 0x80, 0x01}; + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(16384, result); + } + + @Test + public void testReadVarintFromStream_EmptyStream() { + InputStream inputStream = new ByteArrayInputStream(new byte[0]); + + IOException exception = assertThrows(IOException.class, () -> readVarintFromStream(inputStream)); + assertTrue(exception.getMessage().contains("Unexpected end of stream while reading varint")); + } + + @Test + public void testReadVarintFromStream_TruncatedStream() { + // Start of a multi-byte varint but missing continuation bytes (0x80 = 10000000) + byte[] data = {(byte) 0x80}; // Indicates more bytes to follow but none provided + InputStream inputStream = new ByteArrayInputStream(data); + + IOException exception = assertThrows(IOException.class, () -> readVarintFromStream(inputStream)); + assertTrue(exception.getMessage().contains("Unexpected end of stream while reading varint")); + } + + @Test + public void testReadVarintFromStream_TooLong() { + // Varint with more than 32 bits (5 bytes with continuation bit set) + // (10000000 10000000 10000000 10000000 10000000 00000001) + byte[] data = {(byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, 0x01}; + InputStream inputStream = new ByteArrayInputStream(data); + + IOException exception = assertThrows(IOException.class, () -> readVarintFromStream(inputStream)); + assertTrue(exception.getMessage().contains("Varint too long (more than 32 bits)")); + } + + @Test + public void testDecodeZigZag_PositiveValues() { + // Test positive values + assertEquals(0, decodeZigZag(0)); // 0 -> 0 + assertEquals(1, decodeZigZag(2)); // 1 -> 2 + assertEquals(2, decodeZigZag(4)); // 2 -> 4 + assertEquals(3, decodeZigZag(6)); // 3 -> 6 + assertEquals(150, decodeZigZag(300)); // 150 -> 300 + } + + @Test + public void testDecodeZigZag_NegativeValues() { + // Test negative values + assertEquals(-1, decodeZigZag(1)); // -1 -> 1 + assertEquals(-2, decodeZigZag(3)); // -2 -> 3 + assertEquals(-3, decodeZigZag(5)); // -3 -> 5 + assertEquals(-150, decodeZigZag(299)); // -150 -> 299 + } + + @Test + public void testDecodeZigZag_ExtremeValues() { + // Test extreme values + assertEquals(Integer.MAX_VALUE, decodeZigZag(0xFFFFFFFE)); + assertEquals(Integer.MIN_VALUE, decodeZigZag(0xFFFFFFFF)); + } + + @Test + public void testDecodeZigZag_LargeValues() { + // Test some larger values to verify the algorithm + assertEquals(1000, decodeZigZag(2000)); // 1000 -> 2000 + assertEquals(-1000, decodeZigZag(1999)); // -1000 -> 1999 + } Review Comment: Nit: these tests can be neatly replaced with a single `@ParameterizedTest` using `@CsvSource` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentProtobufMessageNameResolver.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.confluent.schema.AntlrProtobufMessageSchemaParser; +import org.apache.nifi.confluent.schema.ProtobufMessageSchema; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schemaregistry.services.MessageName; +import org.apache.nifi.schemaregistry.services.MessageNameResolver; +import org.apache.nifi.schemaregistry.services.SchemaDefinition; +import org.apache.nifi.schemaregistry.services.StandardMessageName; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.Collections.singletonList; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; + +@Tags({"confluent", "schema", "registry", "protobuf", "message", "name", "resolver"}) +@CapabilityDescription(""" + Resolves Protobuf message names from Confluent Schema Registry wire format by decoding message indexes and looking up the fully qualified name in the schema definition + For Confluent wire format reference see: <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Confluent Wire Format</a> + """) +public class ConfluentProtobufMessageNameResolver extends AbstractControllerService implements MessageNameResolver { + + public static final int MAXIMUM_SUPPORTED_ARRAY_LENGTH = 100; + private Cache<FindMessageNameArguments, MessageName> messageNameCache; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + messageNameCache = Caffeine.newBuilder().maximumSize(1000).expireAfterWrite(Duration.ofHours(1)).build(); + } + + @OnDisabled + public void onDisabled(final ConfigurationContext context) { + if (messageNameCache != null) { + messageNameCache.invalidateAll(); + } + } + + + @Override + public MessageName getMessageName(@NotNull final SchemaDefinition schemaDefinition, final InputStream inputStream) throws IOException { + final ComponentLog logger = getLogger(); + + // Read message indexes directly from stream (Confluent wire format) + final List<Integer> messageIndexes = readMessageIndexesFromStream(inputStream); + + logger.debug("Decoded message indexes: {}", messageIndexes); + + final FindMessageNameArguments findMessageNameArgs = new FindMessageNameArguments(schemaDefinition, messageIndexes); + + return messageNameCache.get(findMessageNameArgs, this::findMessageName); + + } + + /** + * Reads message indexes directly from the input stream using Confluent wire format. + * Format: [array_length:varint][index1:varint][index2:varint]...[indexN:varint] + * Special case: single 0 byte means first message (index 0) + * <p> + * <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Wire format</a> + * + * @param inputStream the input stream positioned after the Confluent header (magic byte + schema ID) + * @return list of message indexes + * @throws IOException if unable to read from stream or invalid format + */ + private @NotNull List<Integer> readMessageIndexesFromStream(final InputStream inputStream) throws IOException { + // Special case: check if first byte is 0 (most common case) + final int firstByte = inputStream.read(); + if (firstByte == -1) { + throw new IOException("Unexpected end of stream while reading message indexes"); + } + + if (firstByte == 0) { + // Single 0 byte means first message type (index 0) + return List.of(0); + } + + // General case: read array length as varint + int arrayLength = readVarintFromStreamAfterFirstByteConsumed(inputStream, firstByte); + arrayLength = decodeZigZag(arrayLength); Review Comment: `readVarint` is always followed by `decodeZigZag`. Will it make sense to incorporate these 2 in the `VarintUtils`? ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java: ########## @@ -47,21 +52,30 @@ public CachingSchemaRegistryClient(final SchemaRegistryClient toWrap, final int .maximumSize(cacheSize) .expireAfterWrite(Duration.ofNanos(expirationNanos)) .build(client::getSchema); + definitionCache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .expireAfterWrite(Duration.ofNanos(expirationNanos)) + .build(client::getSchemaDefinition); } @Override - public RecordSchema getSchema(final String schemaName) { + public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException { Review Comment: Hm, since we're using caffeine, all [checked exceptions are wrapped](https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java#L192) into a `CompletionException`. So we don't actually throw any IO or Schema Not Found exception from this class. Perhaps, we should go through `CachingSchemaRegistryClient` implementations to ensure no original schema related exceptions are wrapped. (not in this pr) ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/test/java/org/apache/nifi/confluent/schemaregistry/VarintUtilsTest.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.writeZigZagVarint; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class VarintUtilsTest { + + @Test + public void testReadVarintFromStream_SingleByte() throws IOException { + byte[] data = {0x08}; // 8 in varint format (0x08 = 00001000) + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(8, result); + } + + @Test + public void testReadVarintFromStream_MultiByte() throws IOException { + byte[] data = {(byte) 0x96, 0x01}; // 150 in varint format (10010110 00000001) + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(150, result); + } + + @Test + public void testReadVarintFromStream_MaxValue() throws IOException { + // Maximum 32-bit value in varint format (11111111 11111111 11111111 11111111 00001111) + byte[] data = {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0x0F}; + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(0xFFFFFFFF, result); + } + + @Test + public void testReadVarintFromStream_WithFirstByte() throws IOException { + byte[] data = {0x08}; // 8 in varint format (0x08 = 00001000) + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStreamAfterFirstByteConsumed(inputStream, 0x08); + assertEquals(8, result); + } + + @Test + public void testReadVarintFromStream_Zero() throws IOException { + byte[] data = {0x00}; // 0 in varint format (0x00 = 00000000) + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(0, result); + } + + @Test + public void testReadVarintFromStream_LargeValue() throws IOException { + // 16384 in varint format (10000000 10000000 00000001) + byte[] data = {(byte) 0x80, (byte) 0x80, 0x01}; + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(16384, result); + } + + @Test + public void testReadVarintFromStream_EmptyStream() { + InputStream inputStream = new ByteArrayInputStream(new byte[0]); + + IOException exception = assertThrows(IOException.class, () -> readVarintFromStream(inputStream)); + assertTrue(exception.getMessage().contains("Unexpected end of stream while reading varint")); + } + + @Test + public void testReadVarintFromStream_TruncatedStream() { + // Start of a multi-byte varint but missing continuation bytes (0x80 = 10000000) + byte[] data = {(byte) 0x80}; // Indicates more bytes to follow but none provided + InputStream inputStream = new ByteArrayInputStream(data); + + IOException exception = assertThrows(IOException.class, () -> readVarintFromStream(inputStream)); + assertTrue(exception.getMessage().contains("Unexpected end of stream while reading varint")); + } + + @Test + public void testReadVarintFromStream_TooLong() { + // Varint with more than 32 bits (5 bytes with continuation bit set) + // (10000000 10000000 10000000 10000000 10000000 00000001) + byte[] data = {(byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, 0x01}; + InputStream inputStream = new ByteArrayInputStream(data); + + IOException exception = assertThrows(IOException.class, () -> readVarintFromStream(inputStream)); + assertTrue(exception.getMessage().contains("Varint too long (more than 32 bits)")); + } + + @Test + public void testDecodeZigZag_PositiveValues() { + // Test positive values + assertEquals(0, decodeZigZag(0)); // 0 -> 0 + assertEquals(1, decodeZigZag(2)); // 1 -> 2 + assertEquals(2, decodeZigZag(4)); // 2 -> 4 + assertEquals(3, decodeZigZag(6)); // 3 -> 6 + assertEquals(150, decodeZigZag(300)); // 150 -> 300 + } + + @Test + public void testDecodeZigZag_NegativeValues() { + // Test negative values + assertEquals(-1, decodeZigZag(1)); // -1 -> 1 + assertEquals(-2, decodeZigZag(3)); // -2 -> 3 + assertEquals(-3, decodeZigZag(5)); // -3 -> 5 + assertEquals(-150, decodeZigZag(299)); // -150 -> 299 + } + + @Test + public void testDecodeZigZag_ExtremeValues() { + // Test extreme values + assertEquals(Integer.MAX_VALUE, decodeZigZag(0xFFFFFFFE)); + assertEquals(Integer.MIN_VALUE, decodeZigZag(0xFFFFFFFF)); + } + + @Test + public void testDecodeZigZag_LargeValues() { + // Test some larger values to verify the algorithm + assertEquals(1000, decodeZigZag(2000)); // 1000 -> 2000 + assertEquals(-1000, decodeZigZag(1999)); // -1000 -> 1999 + } + + @Test + public void testWriteZigZagVarint_Zero() throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + writeZigZagVarint(output, 0); + byte[] result = output.toByteArray(); + + assertEquals(1, result.length); + assertEquals(0x00, result[0]); + } + + @Test + public void testWriteZigZagVarint_PositiveNumber() throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + writeZigZagVarint(output, 150); + byte[] result = output.toByteArray(); + + assertEquals(2, result.length); + assertEquals((byte) 0xAC, result[0]); // Lower 7 bits of 300 with continuation bit + assertEquals(0x02, result[1]); // Upper bits of 300 (150 -> 300 zigzag) + } + + @Test + public void testWriteZigZagVarint_NegativeNumber() throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + writeZigZagVarint(output, -1); + byte[] result = output.toByteArray(); + + assertEquals(1, result.length); + assertEquals(0x01, result[0]); // -1 -> 1 zigzag + } + + @Test + public void testWriteZigZagVarint_LargePositiveNumber() throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + writeZigZagVarint(output, 1000); + byte[] result = output.toByteArray(); + + assertEquals(2, result.length); + assertEquals((byte) 0xD0, result[0]); // Lower 7 bits of 2000 with continuation bit + assertEquals(0x0F, result[1]); // Upper bits of 2000 (1000 -> 2000 zigzag) + } + + @Test + public void testWriteZigZagVarint_LargeNegativeNumber() throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + writeZigZagVarint(output, -150); + byte[] result = output.toByteArray(); + + assertEquals(2, result.length); + assertEquals((byte) 0xAB, result[0]); // Lower 7 bits of 299 with continuation bit + assertEquals(0x02, result[1]); // Upper bits of 299 (-150 -> 299 zigzag) + } + + @Test + public void testWriteZigZagVarint_ExtremeValues() throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + writeZigZagVarint(output, Integer.MAX_VALUE); + byte[] result = output.toByteArray(); + + assertEquals(5, result.length); + + output = new ByteArrayOutputStream(); + writeZigZagVarint(output, Integer.MIN_VALUE); + result = output.toByteArray(); + + assertEquals(5, result.length); + } + + @Test + public void testWriteZigZagVarint_MultipleValues() throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + // Write multiple values + writeZigZagVarint(output, 0); + writeZigZagVarint(output, 1); + writeZigZagVarint(output, -1); + writeZigZagVarint(output, 150); + + byte[] result = output.toByteArray(); + + // Verify we have the expected number of bytes + assertTrue(result.length > 4); // At least one byte per value + + // Read back and verify + ByteArrayInputStream input = new ByteArrayInputStream(result); + + assertEquals(0, decodeZigZag(readVarintFromStream(input))); + assertEquals(1, decodeZigZag(readVarintFromStream(input))); + assertEquals(-1, decodeZigZag(readVarintFromStream(input))); + assertEquals(150, decodeZigZag(readVarintFromStream(input))); Review Comment: Nit: these tests can be replace with a single `@ParameterizedTest` with `@MethodSource` and `assertArrayEquals` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-antlr-parser/src/main/java/org/apache/nifi/confluent/schema/AntlrProtobufMessageSchemaParser.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schema; + +import org.antlr.v4.runtime.BaseErrorListener; +import org.antlr.v4.runtime.CharStream; +import org.antlr.v4.runtime.CharStreams; +import org.antlr.v4.runtime.CommonTokenStream; +import org.antlr.v4.runtime.RecognitionException; +import org.antlr.v4.runtime.Recognizer; +import org.apache.nifi.confluent.schema.antlr.Protobuf3BaseVisitor; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Lexer; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser.MessageDefContext; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser.PackageStatementContext; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser.ProtoContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Stack; + +/** + * Implementation of ProtobufMessageSchemaParser that uses ANTLR to parse protobuf schemas + * and extract message definitions including nested messages. + */ +public class AntlrProtobufMessageSchemaParser implements ProtobufMessageSchemaParser { + + @Override + public List<ProtobufMessageSchema> parse(final String schemaText) { + + final CharStream input = CharStreams.fromString(schemaText); + final Protobuf3Lexer lexer = new Protobuf3Lexer(input); + final Protobuf3Parser parser = getProtobuf3Parser(lexer); + + final ProtoContext tree = parser.proto(); + + // Create visitor and analyze + final SchemaVisitor visitor = new SchemaVisitor(); + visitor.visit(tree); + + return visitor.getProtoMessages(); + + } + + private Protobuf3Parser getProtobuf3Parser(final Protobuf3Lexer lexer) { + final CommonTokenStream tokens = new CommonTokenStream(lexer); + final Protobuf3Parser parser = new Protobuf3Parser(tokens); + + // Add error listener to capture parsing errors + parser.removeErrorListeners(); + parser.addErrorListener(new BaseErrorListener() { + @Override + public void syntaxError(final Recognizer<?, ?> recognizer, final Object offendingSymbol, final int line, final int charPositionInLine, final String msg, final RecognitionException e) { + throw new RuntimeException(String.format("Syntax error at line %d, position %d: %s", line, charPositionInLine, msg), e); + } + }); + return parser; + } + + private static class SchemaVisitor extends Protobuf3BaseVisitor<Void> { + + private final List<ProtobufMessageSchema> rootMessages = new ArrayList<>(); + private final Stack<ProtobufMessageSchema> messageStack = new Stack<>(); + private String currentPackage; + + @Override + public Void visitPackageStatement(final PackageStatementContext ctx) { + if (ctx.fullIdent() != null) { + currentPackage = ctx.fullIdent().getText(); + } + return null; + } + + @Override + public Void visitMessageDef(final MessageDefContext ctx) { + final String messageName = ctx.messageName().getText(); + + final ProtobufMessageSchema protobufMessageSchema = new ProtobufMessageSchema(messageName, Optional.ofNullable(currentPackage)); Review Comment: Nit: let's run `Optional.ofNullable(currentPackage)` only once, in `visitPackageStatement` ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Utility class for reading, writing and decoding varint values from input streams. + * This class provides methods for reading variable-length integers (varints), + * writing zigzag-encoded varints, and decoding zigzag-encoded integers as per the Protocol Buffers specification. + */ +public final class VarintUtils { + + private VarintUtils() { } + + /** + * Reads a single varint from the input stream. + * + * <p>Variable-length integers (varints) are a method of serializing integers using one or more bytes. + * Smaller numbers take fewer bytes. Each byte in a varint, except the last byte, has the most + * significant bit set – this indicates that there are further bytes to come. The lower 7 bits + * of each byte are used to store the two's complement representation of the number in groups of 7 bits, + * least significant group first.</p> + * + * <p>For more information about varint encoding, see: + * <a href="https://en.wikipedia.org/wiki/Variable-length_quantity">Variable-length quantity - Wikipedia</a></p> + * + * <p>This implementation follows the Protocol Buffers varint encoding format as described in: + * <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">Protocol Buffers Encoding</a></p> Review Comment: Nit: this part of the documentation is duplicated for both `read*`methods. The class itself seems to be a better place for that. ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardMessageName.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.Objects; +import java.util.Optional; + +public class StandardMessageName implements MessageName { Review Comment: Is there any benefit in introducing an interface for `MessageName` provided that `StandardMessageName` is in the API module as well? ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-antlr-parser/src/main/java/org/apache/nifi/confluent/schema/AntlrProtobufMessageSchemaParser.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schema; + +import org.antlr.v4.runtime.BaseErrorListener; +import org.antlr.v4.runtime.CharStream; +import org.antlr.v4.runtime.CharStreams; +import org.antlr.v4.runtime.CommonTokenStream; +import org.antlr.v4.runtime.RecognitionException; +import org.antlr.v4.runtime.Recognizer; +import org.apache.nifi.confluent.schema.antlr.Protobuf3BaseVisitor; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Lexer; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser.MessageDefContext; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser.PackageStatementContext; +import org.apache.nifi.confluent.schema.antlr.Protobuf3Parser.ProtoContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Stack; + +/** + * Implementation of ProtobufMessageSchemaParser that uses ANTLR to parse protobuf schemas + * and extract message definitions including nested messages. + */ +public class AntlrProtobufMessageSchemaParser implements ProtobufMessageSchemaParser { + + @Override + public List<ProtobufMessageSchema> parse(final String schemaText) { + + final CharStream input = CharStreams.fromString(schemaText); + final Protobuf3Lexer lexer = new Protobuf3Lexer(input); + final Protobuf3Parser parser = getProtobuf3Parser(lexer); + + final ProtoContext tree = parser.proto(); + + // Create visitor and analyze + final SchemaVisitor visitor = new SchemaVisitor(); + visitor.visit(tree); + + return visitor.getProtoMessages(); + + } + + private Protobuf3Parser getProtobuf3Parser(final Protobuf3Lexer lexer) { + final CommonTokenStream tokens = new CommonTokenStream(lexer); + final Protobuf3Parser parser = new Protobuf3Parser(tokens); + + // Add error listener to capture parsing errors + parser.removeErrorListeners(); + parser.addErrorListener(new BaseErrorListener() { + @Override + public void syntaxError(final Recognizer<?, ?> recognizer, final Object offendingSymbol, final int line, final int charPositionInLine, final String msg, final RecognitionException e) { + throw new RuntimeException(String.format("Syntax error at line %d, position %d: %s", line, charPositionInLine, msg), e); + } + }); + return parser; + } + + private static class SchemaVisitor extends Protobuf3BaseVisitor<Void> { + + private final List<ProtobufMessageSchema> rootMessages = new ArrayList<>(); + private final Stack<ProtobufMessageSchema> messageStack = new Stack<>(); Review Comment: `Stack` is a very old synchronized collection. We can use `ArrayDeque` instead. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/VarintUtils.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Utility class for reading, writing and decoding varint values from input streams. + * This class provides methods for reading variable-length integers (varints), + * writing zigzag-encoded varints, and decoding zigzag-encoded integers as per the Protocol Buffers specification. + */ +public final class VarintUtils { Review Comment: 1. This class can be package-private. 2. Let's name it `VarintCodec` or similar. `Utils` doesn't really tell what's going on here. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentProtobufMessageNameResolver.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.confluent.schema.AntlrProtobufMessageSchemaParser; +import org.apache.nifi.confluent.schema.ProtobufMessageSchema; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schemaregistry.services.MessageName; +import org.apache.nifi.schemaregistry.services.MessageNameResolver; +import org.apache.nifi.schemaregistry.services.SchemaDefinition; +import org.apache.nifi.schemaregistry.services.StandardMessageName; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.Collections.singletonList; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; + +@Tags({"confluent", "schema", "registry", "protobuf", "message", "name", "resolver"}) +@CapabilityDescription(""" + Resolves Protobuf message names from Confluent Schema Registry wire format by decoding message indexes and looking up the fully qualified name in the schema definition + For Confluent wire format reference see: <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Confluent Wire Format</a> + """) +public class ConfluentProtobufMessageNameResolver extends AbstractControllerService implements MessageNameResolver { + + public static final int MAXIMUM_SUPPORTED_ARRAY_LENGTH = 100; + private Cache<FindMessageNameArguments, MessageName> messageNameCache; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + messageNameCache = Caffeine.newBuilder().maximumSize(1000).expireAfterWrite(Duration.ofHours(1)).build(); + } + + @OnDisabled + public void onDisabled(final ConfigurationContext context) { + if (messageNameCache != null) { + messageNameCache.invalidateAll(); + } + } + + + @Override + public MessageName getMessageName(@NotNull final SchemaDefinition schemaDefinition, final InputStream inputStream) throws IOException { + final ComponentLog logger = getLogger(); + + // Read message indexes directly from stream (Confluent wire format) + final List<Integer> messageIndexes = readMessageIndexesFromStream(inputStream); + + logger.debug("Decoded message indexes: {}", messageIndexes); + + final FindMessageNameArguments findMessageNameArgs = new FindMessageNameArguments(schemaDefinition, messageIndexes); + + return messageNameCache.get(findMessageNameArgs, this::findMessageName); + + } + + /** + * Reads message indexes directly from the input stream using Confluent wire format. + * Format: [array_length:varint][index1:varint][index2:varint]...[indexN:varint] + * Special case: single 0 byte means first message (index 0) + * <p> + * <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Wire format</a> + * + * @param inputStream the input stream positioned after the Confluent header (magic byte + schema ID) + * @return list of message indexes + * @throws IOException if unable to read from stream or invalid format + */ + private @NotNull List<Integer> readMessageIndexesFromStream(final InputStream inputStream) throws IOException { + // Special case: check if first byte is 0 (most common case) + final int firstByte = inputStream.read(); + if (firstByte == -1) { + throw new IOException("Unexpected end of stream while reading message indexes"); + } + + if (firstByte == 0) { + // Single 0 byte means first message type (index 0) + return List.of(0); + } + + // General case: read array length as varint + int arrayLength = readVarintFromStreamAfterFirstByteConsumed(inputStream, firstByte); + arrayLength = decodeZigZag(arrayLength); + + if (arrayLength < 0 || arrayLength > MAXIMUM_SUPPORTED_ARRAY_LENGTH) { // Reasonable limit + throw new IllegalStateException("Invalid message index array length: " + arrayLength); + } + + // Read each index as varint + final List<Integer> indexes = new ArrayList<>(); + for (int i = 0; i < arrayLength; i++) { + final int rawIndex = readVarintFromStream(inputStream); + final int index = decodeZigZag(rawIndex); + indexes.add(index); + } + + return indexes; + } + + /** + * Finds the fully qualified message name in the protobuf schema using the message indexes. + */ + private @NotNull MessageName findMessageName(final FindMessageNameArguments findMessageNameArguments) { + try { + final List<Integer> messageIndexes = findMessageNameArguments.messageIndexes(); + final String schemaText = findMessageNameArguments.schemaDefinition().text(); + // Parse the protobuf schema using AntlrProtobufMessageSchemaParser + final AntlrProtobufMessageSchemaParser reader = new AntlrProtobufMessageSchemaParser(); + final List<ProtobufMessageSchema> rootMessages = reader.parse(schemaText); + + if (messageIndexes.isEmpty()) { + // Return the topmost root message name + if (!rootMessages.isEmpty()) { + return getFullyQualifiedName(singletonList(rootMessages.getFirst())); + } else { + throw new IllegalStateException("No root messages found in schema"); + } + } + + // Navigate through the message hierarchy using indexes + ProtobufMessageSchema currentMessage; + List<ProtobufMessageSchema> currentLevel = rootMessages; + final List<ProtobufMessageSchema> messagePath = new ArrayList<>(); + + for (final int index : messageIndexes) { + if (index >= currentLevel.size()) { + final String msg = format("Message index %d out of bounds for level with %d messages. Message indexes: [%s]", index, currentLevel.size(), messageIndexes); + getLogger().error(msg); + throw new IllegalStateException(msg); Review Comment: Nit: shall we log the message only in the `IllegalStateException` handling logic? ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/StandardMessageName.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.Objects; +import java.util.Optional; + +public class StandardMessageName implements MessageName { Review Comment: Nit: this can be a record. ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java: ########## @@ -45,4 +45,32 @@ public interface SchemaRegistry extends ControllerService { * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #retrieveSchema(SchemaIdentifier)} */ Set<SchemaField> getSuppliedSchemaFields(); + + /** + * Retrieves the raw schema definition including its textual representation and references. + * <p> + * This method is used to retrieve the complete schema definition structure, including the raw schema text + * and any schema references. Unlike {@link #retrieveSchema(SchemaIdentifier)}, which returns a parsed + * {@link RecordSchema} ready for immediate use, this method returns a {@link SchemaDefinition} containing + * the raw schema content that can be used for custom schema processing, compilation, or when schema + * references need to be resolved. + * </p> + * <p> + * This method is particularly useful for: + * <ul> + * <li>Processing schemas that reference other schemas (e.g., Protocol Buffers with imports)</li> + * <li>Custom schema compilation workflows where the raw schema text is needed</li> + * <li>Accessing schema metadata and references for advanced schema processing</li> + * </ul> + * </p> + * + * @param schemaIdentifier the schema identifier containing id, name, version, and optionally branch information + * @return a {@link SchemaDefinition} containing the raw schema text, type, identifier, and references + * @throws IOException if unable to communicate with the backing store + * @throws SchemaNotFoundException if unable to find the schema based on the given identifier + * @throws UnsupportedOperationException if the schema registry implementation does not support raw schema retrieval + */ + default SchemaDefinition retrieveSchemaRaw(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { Review Comment: Extreme nitpicking: `retrieveRawSchema`. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java: ########## @@ -243,7 +338,7 @@ private String getSubjectPath(final String schemaName, final Integer schemaVersi (schemaVersion == null ? "latest" : URLEncoder.encode(String.valueOf(schemaVersion), StandardCharsets.UTF_8)); } - private String getSchemaPath(final int schemaId) { + private String getSchemaPath(final long schemaId) { Review Comment: Let's keep it the parameter as `int` and cast the id to `int` in `getSchemaDefinition` instead. Otherwise it looks confusing that Confluent ids, which are always 4 bytes, are treated as longs. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentProtobufMessageNameResolver.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.confluent.schema.AntlrProtobufMessageSchemaParser; +import org.apache.nifi.confluent.schema.ProtobufMessageSchema; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schemaregistry.services.MessageName; +import org.apache.nifi.schemaregistry.services.MessageNameResolver; +import org.apache.nifi.schemaregistry.services.SchemaDefinition; +import org.apache.nifi.schemaregistry.services.StandardMessageName; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.Collections.singletonList; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; + +@Tags({"confluent", "schema", "registry", "protobuf", "message", "name", "resolver"}) +@CapabilityDescription(""" + Resolves Protobuf message names from Confluent Schema Registry wire format by decoding message indexes and looking up the fully qualified name in the schema definition + For Confluent wire format reference see: <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Confluent Wire Format</a> + """) +public class ConfluentProtobufMessageNameResolver extends AbstractControllerService implements MessageNameResolver { + + public static final int MAXIMUM_SUPPORTED_ARRAY_LENGTH = 100; + private Cache<FindMessageNameArguments, MessageName> messageNameCache; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + messageNameCache = Caffeine.newBuilder().maximumSize(1000).expireAfterWrite(Duration.ofHours(1)).build(); Review Comment: Nit: let's move cache size to the constants as well. Also, just wondering, will it make sense to make this a configurable property, so the cache size can be decreased for memory-constrained environments? ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/MessageNameResolver.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import org.apache.nifi.controller.ControllerService; + +import java.io.IOException; +import java.io.InputStream; + +/** + * A service interface for resolving message names from schema definitions and input streams. + * This interface is typically used in scenarios where message types need to be determined + * dynamically from the content of the message and the associated schema definition. + * <p> + * Implementations of this interface can be used to extract or derive message names from + * various sources such as message headers, content, or schema metadata, enabling proper + * message processing and routing based on the resolved message type. + * </p> + */ +public interface MessageNameResolver extends ControllerService { + + /** + * Resolves and returns the message name based on the provided schema definition and input stream. + * <p> + * This method analyzes the given schema definition and input stream to determine the appropriate + * message name. The resolution strategy depends on the specific implementation and may involve + * parsing message headers, analyzing message content, or consulting schema metadata. Review Comment: With the existing interface this isn't possible to pass any additional message headers. We can pass an additional `Map<String, String> variables` parameter, as done in [SchemaReferenceReader](https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaReferenceReader.java#L42). This isn't needed for Confluent, but will be useful for other schema registries, e.g. Apicurio. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-antlr-parser/src/main/resources/Protobuf3.g4: ########## Review Comment: If I remember correctly, @exceptionfactory agreed with @lkuchars that this file should be fetched during build instead of copying it into the repo. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentProtobufMessageNameResolver.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.confluent.schema.AntlrProtobufMessageSchemaParser; +import org.apache.nifi.confluent.schema.ProtobufMessageSchema; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schemaregistry.services.MessageName; +import org.apache.nifi.schemaregistry.services.MessageNameResolver; +import org.apache.nifi.schemaregistry.services.SchemaDefinition; +import org.apache.nifi.schemaregistry.services.StandardMessageName; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.Collections.singletonList; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; + +@Tags({"confluent", "schema", "registry", "protobuf", "message", "name", "resolver"}) +@CapabilityDescription(""" + Resolves Protobuf message names from Confluent Schema Registry wire format by decoding message indexes and looking up the fully qualified name in the schema definition + For Confluent wire format reference see: <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Confluent Wire Format</a> + """) +public class ConfluentProtobufMessageNameResolver extends AbstractControllerService implements MessageNameResolver { + + public static final int MAXIMUM_SUPPORTED_ARRAY_LENGTH = 100; + private Cache<FindMessageNameArguments, MessageName> messageNameCache; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + messageNameCache = Caffeine.newBuilder().maximumSize(1000).expireAfterWrite(Duration.ofHours(1)).build(); + } + + @OnDisabled + public void onDisabled(final ConfigurationContext context) { + if (messageNameCache != null) { + messageNameCache.invalidateAll(); + } + } + + + @Override + public MessageName getMessageName(@NotNull final SchemaDefinition schemaDefinition, final InputStream inputStream) throws IOException { + final ComponentLog logger = getLogger(); + + // Read message indexes directly from stream (Confluent wire format) + final List<Integer> messageIndexes = readMessageIndexesFromStream(inputStream); + + logger.debug("Decoded message indexes: {}", messageIndexes); + + final FindMessageNameArguments findMessageNameArgs = new FindMessageNameArguments(schemaDefinition, messageIndexes); + + return messageNameCache.get(findMessageNameArgs, this::findMessageName); + + } + + /** + * Reads message indexes directly from the input stream using Confluent wire format. + * Format: [array_length:varint][index1:varint][index2:varint]...[indexN:varint] + * Special case: single 0 byte means first message (index 0) + * <p> + * <a href="https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format">Wire format</a> + * + * @param inputStream the input stream positioned after the Confluent header (magic byte + schema ID) + * @return list of message indexes + * @throws IOException if unable to read from stream or invalid format + */ + private @NotNull List<Integer> readMessageIndexesFromStream(final InputStream inputStream) throws IOException { + // Special case: check if first byte is 0 (most common case) + final int firstByte = inputStream.read(); + if (firstByte == -1) { + throw new IOException("Unexpected end of stream while reading message indexes"); + } + + if (firstByte == 0) { + // Single 0 byte means first message type (index 0) + return List.of(0); + } + + // General case: read array length as varint + int arrayLength = readVarintFromStreamAfterFirstByteConsumed(inputStream, firstByte); + arrayLength = decodeZigZag(arrayLength); + + if (arrayLength < 0 || arrayLength > MAXIMUM_SUPPORTED_ARRAY_LENGTH) { // Reasonable limit + throw new IllegalStateException("Invalid message index array length: " + arrayLength); + } + + // Read each index as varint + final List<Integer> indexes = new ArrayList<>(); + for (int i = 0; i < arrayLength; i++) { + final int rawIndex = readVarintFromStream(inputStream); + final int index = decodeZigZag(rawIndex); + indexes.add(index); + } + + return indexes; + } + + /** + * Finds the fully qualified message name in the protobuf schema using the message indexes. + */ + private @NotNull MessageName findMessageName(final FindMessageNameArguments findMessageNameArguments) { + try { + final List<Integer> messageIndexes = findMessageNameArguments.messageIndexes(); + final String schemaText = findMessageNameArguments.schemaDefinition().text(); + // Parse the protobuf schema using AntlrProtobufMessageSchemaParser + final AntlrProtobufMessageSchemaParser reader = new AntlrProtobufMessageSchemaParser(); + final List<ProtobufMessageSchema> rootMessages = reader.parse(schemaText); + + if (messageIndexes.isEmpty()) { + // Return the topmost root message name + if (!rootMessages.isEmpty()) { + return getFullyQualifiedName(singletonList(rootMessages.getFirst())); + } else { + throw new IllegalStateException("No root messages found in schema"); + } + } + + // Navigate through the message hierarchy using indexes + ProtobufMessageSchema currentMessage; + List<ProtobufMessageSchema> currentLevel = rootMessages; + final List<ProtobufMessageSchema> messagePath = new ArrayList<>(); + + for (final int index : messageIndexes) { + if (index >= currentLevel.size()) { + final String msg = format("Message index %d out of bounds for level with %d messages. Message indexes: [%s]", index, currentLevel.size(), messageIndexes); + getLogger().error(msg); + throw new IllegalStateException(msg); + } + + currentMessage = currentLevel.get(index); + messagePath.add(currentMessage); + + // Move to nested messages of the current message + currentLevel = currentMessage.getChildMessageSchemas(); + } + + // Return the fully qualified name including parent message hierarchy + return getFullyQualifiedName(messagePath); + + } catch (final Exception e) { + throw new IllegalStateException("Failed to parse protobuf schema", e); + } + } + + /** + * Gets the fully qualified name for a message path, including parent message names. + */ + private @NotNull MessageName getFullyQualifiedName(@NotNull final List<ProtobufMessageSchema> messagePath) { + final ProtobufMessageSchema firstMessage = messagePath.getFirst(); + + final String fullName = messagePath.stream() + .map(ProtobufMessageSchema::name) + .collect(Collectors.joining(".")); + + return new StandardMessageName(firstMessage.packageName(), fullName); + } + + record FindMessageNameArguments(SchemaDefinition schemaDefinition, List<Integer> messageIndexes) { Review Comment: Nit: this can be private. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-antlr-parser/pom.xml: ########## @@ -0,0 +1,94 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-confluent-platform-bundle</artifactId> + <version>2.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-confluent-protobuf-antlr-parser</artifactId> + <packaging>jar</packaging> + + <properties> + <antlr4.version>4.13.1</antlr4.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + <version>${antlr4.version}</version> Review Comment: Since we're adding a new dependency, `nifi-confluent-platform-nar` should mention it in the LICENSE file. [Apache guideline](https://infra.apache.org/licensing-howto.html#permissive-deps). ########## nifi-extension-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java: ########## @@ -45,4 +45,32 @@ public interface SchemaRegistry extends ControllerService { * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #retrieveSchema(SchemaIdentifier)} */ Set<SchemaField> getSuppliedSchemaFields(); + + /** + * Retrieves the raw schema definition including its textual representation and references. + * <p> + * This method is used to retrieve the complete schema definition structure, including the raw schema text + * and any schema references. Unlike {@link #retrieveSchema(SchemaIdentifier)}, which returns a parsed + * {@link RecordSchema} ready for immediate use, this method returns a {@link SchemaDefinition} containing + * the raw schema content that can be used for custom schema processing, compilation, or when schema + * references need to be resolved. + * </p> + * <p> + * This method is particularly useful for: + * <ul> + * <li>Processing schemas that reference other schemas (e.g., Protocol Buffers with imports)</li> + * <li>Custom schema compilation workflows where the raw schema text is needed</li> + * <li>Accessing schema metadata and references for advanced schema processing</li> + * </ul> + * </p> + * + * @param schemaIdentifier the schema identifier containing id, name, version, and optionally branch information + * @return a {@link SchemaDefinition} containing the raw schema text, type, identifier, and references + * @throws IOException if unable to communicate with the backing store + * @throws SchemaNotFoundException if unable to find the schema based on the given identifier + * @throws UnsupportedOperationException if the schema registry implementation does not support raw schema retrieval + */ + default SchemaDefinition retrieveSchemaRaw(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + throw new UnsupportedOperationException("retrieveSchemaRaw is not supported by this SchemaRegistry implementation"); Review Comment: Extending the interface seems more reasonable than creating a new hierarchy for schema registries. However, I'm personally not very happy with throwing an exception in a default implementation. When a NiFi user configures schema registry, there is no way for them to know whether their use case is supported or not. Offhand, to make the experience better we can return a set of supported "features" which will be checked in the respective reader. If a provided schema registry doesn't support raw schemas, the reader validation should fail. Feedback from NiFi maintainers will be appreciated here. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-protobuf-message-name-resolver/src/test/java/org/apache/nifi/confluent/schemaregistry/VarintUtilsTest.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.decodeZigZag; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStream; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.readVarintFromStreamAfterFirstByteConsumed; +import static org.apache.nifi.confluent.schemaregistry.VarintUtils.writeZigZagVarint; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class VarintUtilsTest { + + @Test + public void testReadVarintFromStream_SingleByte() throws IOException { + byte[] data = {0x08}; // 8 in varint format (0x08 = 00001000) + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(8, result); + } + + @Test + public void testReadVarintFromStream_MultiByte() throws IOException { + byte[] data = {(byte) 0x96, 0x01}; // 150 in varint format (10010110 00000001) + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(150, result); + } + + @Test + public void testReadVarintFromStream_MaxValue() throws IOException { + // Maximum 32-bit value in varint format (11111111 11111111 11111111 11111111 00001111) + byte[] data = {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0x0F}; + InputStream inputStream = new ByteArrayInputStream(data); + + int result = readVarintFromStream(inputStream); + assertEquals(0xFFFFFFFF, result); + } + + @Test + public void testReadVarintFromStream_WithFirstByte() throws IOException { Review Comment: Let's also add a test where the first byte has a continuation bit set. ########## nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClientTest.java: ########## @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.confluent.schemaregistry.client; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaDefinition; +import org.apache.nifi.schemaregistry.services.SchemaDefinition.SchemaType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static java.lang.String.format; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +@ExtendWith(MockitoExtension.class) +class RestSchemaRegistryClientTest { + + public static final String CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"; + private static final int SCHEMA_ID = 123; + private static final int SCHEMA_VERSION = 1; + private static final String SUBJECT_NAME = "test-subject"; + private static final int REFERENCED_SCHEMA_VERSION = 2; + private static final int REFERENCED_SCHEMA_ID = 123; + private static final String REFERENCED_SUBJECT_NAME = "referenced-subject"; + private static final String REFERENCED_SCHEMA_NAME = "common.proto"; + private static final String PROTOBUF = "PROTOBUF"; + private static final String AVRO_SCHEMA_TEXT = """ + { + "type": "record", + "name": "User", + "fields": [ + { + "name": "id", + "type": "int" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "address", + "type": { + "type": "record", + "name": "Address", + "fields": [ + { "name": "street", "type": "string" }, + { "name": "city", "type": "string" }, + { "name": "zip", "type": "int" } + ] + } + } + ] + }"""; + private static final String PROTOBUF_SCHEMA_TEXT = """ + syntax = "proto3"; + + package example; + + import "common.proto"; + + message User { + int32 id = 1; + string name = 2; + string email = 3; + Address address = 4; + google.protobuf.Timestamp created_at = 5; + common.Status status = 6; + } + + message Address { + string street = 1; + string city = 2; + string state = 3; + string zip_code = 4; + string country = 5; + }"""; + private static final String REFERENCED_SCHEMA_TEXT = """ + syntax = "proto3"; + + package common; + + enum Status { + UNKNOWN = 0; + ACTIVE = 1; + INACTIVE = 2; + PENDING = 3; + }"""; + private WireMockServer wireMockServer; + private RestSchemaRegistryClient client; + private String baseUrl; + private ObjectMapper objectMapper; + + @Mock + private ComponentLog logger; + + @BeforeEach + void setUp() { + wireMockServer = new WireMockServer(WireMockConfiguration.wireMockConfig().port(0)); + wireMockServer.start(); + + WireMock.configureFor("localhost", wireMockServer.port()); + baseUrl = "http://localhost:" + wireMockServer.port(); + + client = new RestSchemaRegistryClient(List.of(baseUrl), 30000, null, null, null, logger, Map.of()); + objectMapper = new ObjectMapper(); + } + + @AfterEach + void tearDown() { + if (wireMockServer != null) { + wireMockServer.stop(); + } + } + + @Test + void testGetSchemaByIdWithoutSubjectAndVersionInfo() throws IOException, SchemaNotFoundException { + // Mock the primary schema endpoint + stubSchemaByIdEndpoint(SCHEMA_ID, AVRO_SCHEMA_TEXT); + + // Mock all other endpoints to return 404 (not supported or no data) + stubEndpointNotFound("/schemas/ids/" + SCHEMA_ID + "/subjects"); + stubEndpointNotFound("/schemas/ids/" + SCHEMA_ID + "/versions"); + stubEndpointNotFound("/subjects"); + + RecordSchema schema = client.getSchema(SCHEMA_ID); + + // Verify the result (should still work but without subject/version info) + assertNotNull(schema); + assertFalse(schema.getIdentifier().getName().isPresent()); + assertEquals(SCHEMA_ID, schema.getIdentifier().getIdentifier().getAsLong()); + assertFalse(schema.getIdentifier().getVersion().isPresent()); + assertTrue(schema.getSchemaText().isPresent()); + + // Verify that the expected endpoints were called + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID))); + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID + "/subjects"))); + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID + "/versions"))); + verify(getRequestedFor(urlEqualTo("/subjects"))); + } + + + @Test + void testGetSchemaByIdWithSubjectsEndpointSupport() throws IOException, SchemaNotFoundException { + stubSchemaByIdEndpoint(SCHEMA_ID, AVRO_SCHEMA_TEXT); + + // Mock the subjects endpoint (v5.3.1+ feature) + stubSubjectsEndpoint(SCHEMA_ID, SUBJECT_NAME); + + stubCompleteSchemaEndpoint(SUBJECT_NAME, SCHEMA_ID, SCHEMA_VERSION, AVRO_SCHEMA_TEXT); + + RecordSchema schema = client.getSchema(SCHEMA_ID); + + assertNotNull(schema); + assertEquals(SUBJECT_NAME, schema.getIdentifier().getName().get()); + assertEquals(SCHEMA_ID, schema.getIdentifier().getIdentifier().getAsLong()); + assertEquals(SCHEMA_VERSION, schema.getIdentifier().getVersion().getAsInt()); + assertTrue(schema.getSchemaText().isPresent()); + + // Verify that the expected endpoints were called + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID))); + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID + "/subjects"))); + verify(postRequestedFor(urlEqualTo("/subjects/" + SUBJECT_NAME))); + } + + @Test + void testGetSchemaByIdWithVersionsEndpointFallback() throws IOException, SchemaNotFoundException { + stubSchemaByIdEndpoint(SCHEMA_ID, AVRO_SCHEMA_TEXT); + + stubEndpointNotFound("/schemas/ids/" + SCHEMA_ID + "/subjects"); + stubVersionsEndpoint(SCHEMA_ID, SUBJECT_NAME, SCHEMA_VERSION); + + RecordSchema schema = client.getSchema(SCHEMA_ID); + + assertNotNull(schema); + assertEquals(SUBJECT_NAME, schema.getIdentifier().getName().get()); + assertEquals(SCHEMA_ID, schema.getIdentifier().getIdentifier().getAsLong()); + assertEquals(SCHEMA_VERSION, schema.getIdentifier().getVersion().getAsInt()); + assertTrue(schema.getSchemaText().isPresent()); + + // Verify that the expected endpoints were called + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID))); + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID + "/subjects"))); + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID + "/versions"))); + } + + @Test + void testGetSchemaByIdWithAllSubjectsFallback() throws IOException, SchemaNotFoundException { + stubSchemaByIdEndpoint(SCHEMA_ID, AVRO_SCHEMA_TEXT); + stubEndpointNotFound("/schemas/ids/" + SCHEMA_ID + "/subjects"); + stubEndpointNotFound("/schemas/ids/" + SCHEMA_ID + "/versions"); + stubAllSubjectsEndpoint(SUBJECT_NAME, "other-subject"); + stubCompleteSchemaEndpoint(SUBJECT_NAME, SCHEMA_ID, SCHEMA_VERSION, AVRO_SCHEMA_TEXT); + stubPostEndpointNotFound("/subjects/other-subject"); + + RecordSchema schema = client.getSchema(SCHEMA_ID); + + assertNotNull(schema); + assertEquals(SUBJECT_NAME, schema.getIdentifier().getName().get()); + assertEquals(SCHEMA_ID, schema.getIdentifier().getIdentifier().getAsLong()); + assertEquals(SCHEMA_VERSION, schema.getIdentifier().getVersion().getAsInt()); + assertTrue(schema.getSchemaText().isPresent()); + + // Verify that the expected endpoints were called + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID))); + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID + "/subjects"))); + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID + "/versions"))); + verify(getRequestedFor(urlEqualTo("/subjects"))); + verify(postRequestedFor(urlEqualTo("/subjects/" + SUBJECT_NAME))); + } + + + @Test + void testGetSchemaByIdNotFound() { + + stubEndpointNotFound("/schemas/ids/" + SCHEMA_ID); + assertThrows(SchemaNotFoundException.class, () -> client.getSchema(SCHEMA_ID)); + + // Verify that the expected endpoint was called + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID))); + } + + @Test + void testGetSchemaByIdServerError() { + + stubEndpointServerError("/schemas/ids/" + SCHEMA_ID, "Internal Server Error"); + assertThrows(SchemaNotFoundException.class, () -> client.getSchema(SCHEMA_ID)); + + // Verify that the expected endpoint was called + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID))); + } + + @Test + void testGetSchemaByIdInvalidAvroSchema() throws JsonProcessingException { + stubSchemaByIdEndpoint(SCHEMA_ID, "invalid-avro-schema"); + stubEndpointNotFound("/schemas/ids/" + SCHEMA_ID + "/subjects"); + + assertThrows(SchemaNotFoundException.class, () -> client.getSchema(SCHEMA_ID)); + + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID))); + verify(getRequestedFor(urlEqualTo("/schemas/ids/" + SCHEMA_ID + "/subjects"))); + } + + @Test + void testGetSchemaDefinitionWithProtobufAndReferences() throws IOException, SchemaNotFoundException { + SchemaReference reference = new SchemaReference(REFERENCED_SCHEMA_NAME, REFERENCED_SUBJECT_NAME, REFERENCED_SCHEMA_VERSION); + stubSchemaByIdEndpointWithReferences(SCHEMA_ID, PROTOBUF_SCHEMA_TEXT, List.of(reference)); + stubSchemaBySubjectVersionEndpoint(REFERENCED_SCHEMA_ID, REFERENCED_SUBJECT_NAME, REFERENCED_SCHEMA_TEXT, REFERENCED_SCHEMA_VERSION); + + SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) SCHEMA_ID).build(); + + SchemaDefinition schemaDefinition = client.getSchemaDefinition(schemaIdentifier); + + assertNotNull(schemaDefinition); + + final SchemaIdentifier identifier = schemaDefinition.identifier(); + assertTrue(identifier.getName().isEmpty()); + assertTrue(identifier.getVersion().isEmpty()); + + assertEquals(SCHEMA_ID, identifier.getIdentifier().getAsLong()); + assertEquals(SchemaType.PROTOBUF, schemaDefinition.getSchemaType()); + assertEquals(PROTOBUF_SCHEMA_TEXT, schemaDefinition.text()); + + // Verify that references were fetched + final Map<String, SchemaDefinition> references = schemaDefinition.getReferences(); + assertNotNull(references); + assertEquals(1, references.size()); + SchemaDefinition referencedSchema = references.get(REFERENCED_SCHEMA_NAME); + assertNotNull(referencedSchema); + final SchemaIdentifier referencedId = referencedSchema.identifier(); + assertEquals(REFERENCED_SUBJECT_NAME, referencedId.getName().get()); + assertEquals(REFERENCED_SCHEMA_ID, referencedId.getIdentifier().getAsLong()); + assertEquals(REFERENCED_SCHEMA_VERSION, referencedId.getVersion().getAsInt()); Review Comment: Let's also verify that `REFERENCED_SCHEMA_TEXT` was returned in the `referencedSchema` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
