[
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lukas Kucharski reassigned NIFI-14424:
--------------------------------------
Assignee: Lukas Kucharski (was: Alaksiej Ščarbaty)
> Support schema registries in ProtobufReader
> -------------------------------------------
>
> Key: NIFI-14424
> URL: https://issues.apache.org/jira/browse/NIFI-14424
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Alaksiej Ščarbaty
> Assignee: Lukas Kucharski
> Priority: Major
>
> Currently for Protobuf record parsing there is a
> [ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
> which currently supports proto files on a [local filesystem
> only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
> The change aims on supporting schema registries to retrieve proto
> definitions.
> Since presence of original proto files is a must for parsing, a new
> abstractions related to schemas will be introduced.
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a
> collection of referenced {_}SchemaDefinitions{_}.
> h2. SchemaDefinitionProvider
> New _SchemaDefinitionProviders_ provide a simple interface for retrieving
> Schemas as text, together with their references. Unlike their
> _SchemaRegistry_ counterparts, no parsing is going to take place on that
> level, and it’s up to the caller what to do with returned texts.
> The interface will be put into the _nifi-schema-registry-service-api_ module.
> Whereas the implementations should be created in their respective modules,
> like _nifi-confluent-schema-registry-service_ for Confluent.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message
> entries. To understand what message to use for decoding, a message index is
> usually encoded in the payload. And as with a schema reference, different
> systems encode this index differently.
> _MessageNameResolver_ will be a new _ControllerService_ that is going to
> translate an encoded Protobuf message index into a Protobuf Message name
> that’s later going to be used to find a correct Protobuf message for
> deserialization.
> Similar to the _SchemaDefinitionProvider,_ the interface will be put into the
> _nifi-schema-registry-service-api_ module. Schema Registry dependent
> implementations will land into their respective modules. E.g.
> _nifi-confluent-schema-registry-service_ for Confluent.
> h2. ProtobufReader
> Since the existing ProtobufReader extends {_}SchemaRegistryService{_}, we’ll
> conform to that interface and add a new {_}Schema Access Strategy -
> SCHEMA_DEFINITION_PROVIDER{_}.
> The _createRecordReader_ method will be split in 2 branches. One for
> _SCHEMA_DEFINITION_PROVIDER_ (new) and one for the rest (the existing one).
> The _getSupportedPropertyDescriptors_ will be modified to define the new
> properties, as well as to make the _SCHEMA_REFERENCE_READER_ depend on
> _SCHEMA_DEFINITION_PROVIDER_ value.
> Attribute modifications:
>
> |*Attribute Name*|*Description*|*Value*|*Comment*|
> |Schema Reference Reader|Service implementation responsible for reading
> FlowFile attributes or content to determine the Schema Reference
> Identifier.|<{_}SchemaReferenceReader{_}> impl|That’s going to be the
> existing property in SchemaRegistryService.|
> |Schema Definition Provider|A provider that will retrieve Protobuf schema
> definitions.|<{_}SchemaDefinitionProvider{_}> impl|This is a new property,
> available only when _Schema Access Strategy_ is _SCHEMA_DEFINITION_PROVIDER._|
> |Message Name Resolver|Service implementation responsible for reading
> FlowFile attributes or content to determine the name of a Protobuf message to
> use for deserialization.|<{_}MessageNameResolver{_}> impl|This is a new
> property, available only when _Schema Access Strategy_ is
> _SCHEMA_DEFINITION_PROVIDER._|
> |Proto Directory, Message Type| | |*These properties will be hidden when*
> *_Schema Access Strategy_* *is* *_SCHEMA_DEFINITION_PROVIDER._*
> *Currently they’re mandatory.* |
> |_Schema Caching parameters, like size and expiration_| | |These are new
> properties, available only when _Schema Access Strategy_ is
> _SCHEMA_DEFINITION_PROVIDER._|
> h3. Custom Protobuf Loader
> The wire’s
> [SchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/jsMain/kotlin/com/squareup/wire/schema/SchemaLoader.kt]
> can’t be used with {_}SchemaDefinitionProviders{_}, as it’s tightly tied to
> a file system. Instead of implementing an adapter for a file system, a custom
> glue-code will be implemented, which either uses a _SchemaDefinitionProvider_
> for loading, or accepts an already fetched SchemaDefinition. See
> [CommonSchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/commonMain/kotlin/com/squareup/wire/schema/internal/CommonSchemaLoader.kt]
> for inspiration.
> This logic will reside in _nifi-protobuf-services_ too.
> h2. Appendix: Code examples
> h3. SchemaDefinitionProvider
> {{record SchemaDefinition(}}
> {{ SchemaIdentifier id,}}
> {{ String text, }}
> {{ Map<String, SchemaDefinition> references // key - by what name the schema
> is referenced from its parent}}
> {{)}}
> {{interface SchemaDefinitionProvider {}}
> {{ SchemaDefinition getById(SchemaIdentifier id);}}
> {{}}}
> {{class ConfluentSchemaDefinitionProvider implements SchemaDefinitionProvider
> {}}
> {{ // ...}}
> {{}}}
> h3. MessageNameResolver
> {{record Message(}}
> {{ String name;}}
> {{ List<Message> nestedMessages;}}
> {{)}}
> {{interface MessageNameResolver {}}
> {{ String getMessageName(InputStream contentStream, List<Message>
> messages);}}
> }
--
This message was sent by Atlassian Jira
(v8.20.10#820010)