As an additional question, I would like to ask if it's compulsory or not to specify the schema of the input struct statically within the UDF implementation.
To add a bit more details, my use case as a matter of fact is the following: I have a handful of topics on which are published JSON with the following structure (it is a CDC event): {"rootField": "value", "timestamp": 1234, "before": {"field1": 1, "field1_isMissing": false}, "after": {"field1": 2, "field1_isMissing": false} } I want to convert these JSONs to Avro and then, for each json, output a sort of "merged" version of the JSON: filtering out some fields, keeping only a field between before.field1 and after.field1, removing the fields with '_isMissing', as a flat structure like: {"field1": 2, "timestamp": 1234} There are a bunch of topics, each topic has a different schema, fields "before" and "after" have different struct schemas (again, this is a CDC event), because each topic is a different table and the UDF handling this should be generic and dynamic enough. Now, apart from the conversion from JSON to Avro (which I could handle with creating streams with VALUE_FORMAT='avro' starting from the avro schemas I will be given (then why is the message in JSON format and not Avro? Unfortunately this doesn't depend on me :/ )), the real challenge is how to implement the merge operation. As already explained, ideally I would like to implement a UDF which could be used as CREATE STREAM output_stream AS SELECT unnest(merge(before, after)), timestamp FROM avro_stream; To output a flat structure. To sum up my questions: 1. First of all: is it even possible to do this in KSQL? If so: 1. I see there's the possibility to use the @schemaProvider annotation to add the possibility of dynamic typing, but I don't find the documentation particularly clear: "To use this functionality, you need to specify a method with signature public SqlType <your-method-name>(final List<SqlType> params) and annotate it with @SchemaProvider. Also, you need to link it to the corresponding UDF by using the schemaProvider=<your-method-name> parameter of the @Udf annotation." 1. What is being represented by the input parameter final List<SqlType>, is it the SqlTypes of the UDF runtime inputs? So that in params.get(0) I will find (in my usecase) the Struct type of 'before' struct field? I guess the returned SqlType is the type against which is validated the main udf method. 2. How should this method even be implemented? I cannot find any example on this. 2. Is it possible to unnest a Struct field? I cannot find any reference on this and since there would be quite a bit of different topics, I really don't want to specify manually an additional stream with SELECT AS SELECT as strct->field1 as field1, timestamp FROM output_stream I hope it wasn't too much of a reading. Thank you for your help, Federico Il giorno lun 2 mar 2020 alle ore 09:47 Federico D'Ambrosio < fedex...@gmail.com> ha scritto: > Hello everyone, > > is there any example of UDF which has Structs as input parameters or > outputs? > > I'm currently implementing a UDF which should be something like: > > Struct merge(Struct before, Struct after) > > (these structs are from nested JSON objects, like {"field1": "value", > "before": {....}, "after": {....}}) > Now, I was using KsqlStruct to enforce types and set values, but then I > found this issue (https://github.com/confluentinc/ksql/issues/3413). > > So, I'm wondering if there's any example of how to use Structs in UDFs, > since I cannot seem to find any in the official documentation. > > Thank you very much for your help, > Federico D'Ambrosio > -- Federico D'Ambrosio