[ 
https://issues.apache.org/jira/browse/NIFI-14424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lukas Kucharski reassigned NIFI-14424:
--------------------------------------

    Assignee: Lukas Kucharski  (was: Alaksiej Ščarbaty)

> Support schema registries in ProtobufReader
> -------------------------------------------
>
>                 Key: NIFI-14424
>                 URL: https://issues.apache.org/jira/browse/NIFI-14424
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Alaksiej Ščarbaty
>            Assignee: Lukas Kucharski
>            Priority: Major
>
> Currently for Protobuf record parsing there is a 
> [ProtobufReader|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L58],
>  which currently supports proto files on a [local filesystem 
> only|https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java#L157].
> The change aims on supporting schema registries to retrieve proto 
> definitions. 
> Since presence of original proto files is a must for parsing, a new 
> abstractions related to schemas will be introduced.
> h2. SchemaDefinition
> A raw schema definition. Contains a {_}SchemaIdentifier{_}, schema text and a 
> collection of referenced {_}SchemaDefinitions{_}.
> h2. SchemaDefinitionProvider
> New _SchemaDefinitionProviders_ provide a simple interface for retrieving 
> Schemas as text, together with their references. Unlike their 
> _SchemaRegistry_ counterparts, no parsing is going to take place on that 
> level, and it’s up to the caller what to do with returned texts.
> The interface will be put into the _nifi-schema-registry-service-api_ module. 
> Whereas the implementations should be created in their respective modules, 
> like _nifi-confluent-schema-registry-service_ for Confluent.
> h2. MessageNameResolver
> In the case of Protobuf a single proto file may contain multiple message 
> entries. To understand what message to use for decoding, a message index is 
> usually encoded in the payload. And as with a schema reference, different 
> systems encode this index differently.
> _MessageNameResolver_ will be a new _ControllerService_ that is going to 
> translate an encoded Protobuf message index into a Protobuf Message name 
> that’s later going to be used to find a correct Protobuf message for 
> deserialization.
> Similar to the _SchemaDefinitionProvider,_ the interface will be put into the 
> _nifi-schema-registry-service-api_ module. Schema Registry dependent 
> implementations will land into their respective modules. E.g. 
> _nifi-confluent-schema-registry-service_ for Confluent.
> h2. ProtobufReader
> Since the existing ProtobufReader extends {_}SchemaRegistryService{_}, we’ll 
> conform to that interface and add a new {_}Schema Access Strategy - 
> SCHEMA_DEFINITION_PROVIDER{_}.
> The _createRecordReader_ method will be split in 2 branches. One for 
> _SCHEMA_DEFINITION_PROVIDER_ (new) and one for the rest (the existing one). 
> The _getSupportedPropertyDescriptors_ will be modified to define the new 
> properties, as well as to make the _SCHEMA_REFERENCE_READER_ depend on 
> _SCHEMA_DEFINITION_PROVIDER_ value.
> Attribute modifications:
>  
> |*Attribute Name*|*Description*|*Value*|*Comment*|
> |Schema Reference Reader|Service implementation responsible for reading 
> FlowFile attributes or content to determine the Schema Reference 
> Identifier.|<{_}SchemaReferenceReader{_}> impl|That’s going to be the 
> existing property in SchemaRegistryService.|
> |Schema Definition Provider|A provider that will retrieve Protobuf schema 
> definitions.|<{_}SchemaDefinitionProvider{_}> impl|This is a new property, 
> available only when _Schema Access Strategy_ is _SCHEMA_DEFINITION_PROVIDER._|
> |Message Name Resolver|Service implementation responsible for reading 
> FlowFile attributes or content to determine the name of a Protobuf message to 
> use for deserialization.|<{_}MessageNameResolver{_}> impl|This is a new 
> property, available only when _Schema Access Strategy_ is 
> _SCHEMA_DEFINITION_PROVIDER._|
> |Proto Directory, Message Type| | |*These properties will be hidden when* 
> *_Schema Access Strategy_* *is* *_SCHEMA_DEFINITION_PROVIDER._*
> *Currently they’re mandatory.* |
> |_Schema Caching parameters, like size and expiration_| | |These are new 
> properties, available only when _Schema Access Strategy_ is 
> _SCHEMA_DEFINITION_PROVIDER._|
> h3. Custom Protobuf Loader
> The wire’s 
> [SchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/jsMain/kotlin/com/squareup/wire/schema/SchemaLoader.kt]
>  can’t be used with {_}SchemaDefinitionProviders{_}, as it’s tightly tied to 
> a file system. Instead of implementing an adapter for a file system, a custom 
> glue-code will be implemented, which either uses a _SchemaDefinitionProvider_ 
> for loading, or accepts an already fetched SchemaDefinition. See 
> [CommonSchemaLoader|https://github.com/square/wire/blob/master/wire-schema/src/commonMain/kotlin/com/squareup/wire/schema/internal/CommonSchemaLoader.kt]
>  for inspiration.
> This logic will reside in _nifi-protobuf-services_ too.
> h2. Appendix: Code examples
> h3. SchemaDefinitionProvider
> {{record SchemaDefinition(}}
> {{  SchemaIdentifier id,}}
> {{  String text,      }}
> {{  Map<String, SchemaDefinition> references // key - by what name the schema 
> is referenced from its parent}}
> {{)}}
> {{interface SchemaDefinitionProvider {}}
> {{  SchemaDefinition getById(SchemaIdentifier id);}}
> {{}}}
> {{class ConfluentSchemaDefinitionProvider implements SchemaDefinitionProvider 
> {}}
> {{  // ...}}
> {{}}}
> h3. MessageNameResolver
> {{record Message(}}
> {{  String name;}}
> {{  List<Message> nestedMessages;}}
> {{)}}
> {{interface MessageNameResolver {}}
> {{  String getMessageName(InputStream contentStream, List<Message> 
> messages);}}
> }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to