As an additional question, I would like to ask if it's compulsory or not to
specify the schema of the input struct statically within the UDF
implementation.

To add a bit more details, my use case as a matter of fact is the following:
I have a handful of topics on which are published JSON with the following
structure (it is a CDC event):

{"rootField": "value", "timestamp": 1234,
"before": {"field1": 1, "field1_isMissing": false},
"after": {"field1": 2, "field1_isMissing": false}
}

I want to convert these JSONs to Avro and then, for each json, output a
sort of "merged" version of the JSON: filtering out some fields, keeping
only a field between before.field1 and after.field1, removing the fields
with '_isMissing', as a flat structure like:

{"field1": 2, "timestamp": 1234}

There are a bunch of topics, each topic has a different schema, fields
"before" and "after" have different struct schemas (again, this is a CDC
event), because each topic is a different table and the UDF handling this
should be generic and dynamic enough.
Now, apart from the conversion from JSON to Avro (which I could handle with
creating streams with VALUE_FORMAT='avro' starting from the avro schemas I
will be given (then why is the message in JSON format and not Avro?
Unfortunately this doesn't depend on me :/ )), the real challenge is how to
implement the merge operation.

As already explained, ideally I would like to implement a UDF which could
be used as

CREATE STREAM output_stream AS SELECT unnest(merge(before, after)),
timestamp FROM avro_stream;

To output a flat structure.

To sum up my questions:

   1. First of all: is it even possible to do this in KSQL?

If so:

   1. I see there's the possibility to use the @schemaProvider annotation
   to add the possibility of dynamic typing, but I don't find the
   documentation particularly clear: "To use this functionality, you need
   to specify a method with signature public SqlType
   <your-method-name>(final List<SqlType> params) and annotate it with
   @SchemaProvider. Also, you need to link it to the corresponding UDF by
   using the schemaProvider=<your-method-name> parameter of the @Udf
   annotation."
   1. What is being represented by the input parameter final List<SqlType>,
      is it the SqlTypes of the UDF runtime inputs? So that in params.get(0)
      I will find (in my usecase) the Struct type of 'before' struct field? I
      guess the returned SqlType is the type against which is
validated the main
      udf method.
      2. How should this method even be implemented? I cannot find any
      example on this.
   2. Is it possible to unnest a Struct field? I cannot find any reference
   on this and since there would be quite a bit of different topics, I really
   don't want to specify manually an additional stream with SELECT AS
   SELECT as strct->field1 as field1, timestamp FROM output_stream

I hope it wasn't too much of a reading.

Thank you for your help,
Federico


Il giorno lun 2 mar 2020 alle ore 09:47 Federico D'Ambrosio <
fedex...@gmail.com> ha scritto:

> Hello everyone,
>
> is there any example of UDF which has Structs as input parameters or
> outputs?
>
> I'm currently implementing a UDF which should be something like:
>
> Struct merge(Struct before, Struct after)
>
> (these structs are from nested JSON objects, like {"field1": "value",
> "before": {....}, "after": {....}})
> Now, I was using KsqlStruct to enforce types and set values, but then I
> found this issue (https://github.com/confluentinc/ksql/issues/3413).
>
> So, I'm wondering if there's any example of how to use Structs in UDFs,
> since I cannot seem to find any in the official documentation.
>
> Thank you very much for your help,
> Federico D'Ambrosio
>


-- 
Federico D'Ambrosio

Reply via email to