Hi Shahar,

1. Are you referring to that the incoming data source is published as JSON
and you have a customized Pojo source function / table source that converts
it? In that case it is you that maintains the schema evolution support am I
correct? For Avro I think you can refer to [1].
2. If you change the SQL, you will have to recompile and rerun your job.
This means the new compilation of the SQL will yield correct logic to run
against your new schema. I don't foresee this to be an issue. For the
second problem: yes it is your customized serialization sink function's
responsibility to convert Row into the output class objects. I am not sure
if this is the piece of code that you are looking for [2] if you are using
Avro, but you might be able to leverage that?

If you are sticking with your own format of generated/dynamic class, you
might have to create that in your custom source/sink table.

Thanks,
Rong

[1]
https://github.com/apache/flink/tree/release-1.7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro
[2]
https://github.com/apache/flink/blob/release-1.7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java#L170

On Thu, Mar 7, 2019 at 11:20 AM Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com> wrote:

> Thanks for the response Rong. Would be happy to clarify more.
> So there are two possible changes that could happen:
>
>    1. There could be a change in the incoming source schema. Since
>    there's a deserialization phase here (JSON -> Pojo) i expect a couple of
>    options. Backward compatible changes to the JSON should not have an impact
>    (as the Pojo is the same), however we might want to change the Pojo which i
>    believe is a state evolving action. I do want to migrate the Pojo to Avro -
>    will that suffice for Schema evolution feature to work?
>    2. The other possible change is the SQL select fields change, as
>    mention someone could add/delete/change-order another field to the SQL
>    Select. I do see this as an issue per the way i transform the Row object to
>    the dynamically generated class. This is done today through indices of the
>    class fields and the ones of the Row object. This seems like an issue for
>    when for example a select field is added in the middle and now there's an
>    older Row which fields order is not matching the (new) generated Class
>    fields order. I'm thinking of how to solve that one - i imagine this is not
>    something the schema evolution feature can solve (am i right?). im thinking
>    on whether there is a way i can transform the Row object to my generated
>    class by maybe the Row's column names corresponding to the generated class
>    field names, though i don't see Row object has any notion of column names.
>
> Would love to hear your thoughts. If you want me to paste some code here i
> can do so.
>
> Shahar
>
> On Thu, Mar 7, 2019 at 10:40 AM Rong Rong <walter...@gmail.com> wrote:
>
>> Hi Shahar,
>>
>> I wasn't sure which schema are you describing that is going to "evolve"
>> (is it the registered_table? or the output sink?). It will be great if you
>> can clarify more.
>>
>> For the example you provided, IMO it is more considered as logic change
>> instead of schema evolution:
>> - if you are changing max(c) to max(d) in your query. I don't think this
>> qualifies as schema evolution.
>> - if you are adding another column "max(d)" to your query along with your
>> existing "max(c)" that might be considered as a backward compatible change.
>> However, either case you will have to restart your logic, you can also
>> consult how state schema evolution [1], and there are many other problems
>> that can be tricky as well[2,3].
>>
>> Thanks,
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
>> [2]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-operator-schema-evolution-savepoint-deserialization-fail-td23079.html
>> [3]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepointing-with-Avro-Schema-change-td19290.html#a19293
>>
>>
>> On Wed, Mar 6, 2019 at 12:52 PM shkob1 <shahar.kobrin...@gmail.com>
>> wrote:
>>
>>> Hey,
>>>
>>> My job is built on SQL that is injected as an input to the job. so lets
>>> take
>>> an example of
>>>
>>> Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a
>>>
>>> (side note: in order for the state not to grow indefinitely i'm
>>> transforming
>>> to a retracted stream and filtering based on a custom trigger)
>>>
>>> In order to get the output as a Json format i basically created a way to
>>> dynamically generate a class and registering it to the class loader, so
>>> when
>>> transforming to the retracted stream im doing something like:
>>>
>>> Table result = tableEnv.sqlQuery(sqlExpression);
>>> tableEnv.toRetractStream(result, Row.class, config)
>>> .filter(tuple -> tuple.f0)
>>> .map(new RowToDynamicClassMapper(sqlSelectFields))
>>> .addSink(..)
>>>
>>> This actually works pretty good (though i do need to make sure to
>>> register
>>> the dynamic class to the class loader whenever the state is loaded)
>>>
>>> Im now looking into "schema evolution" - which basically means what
>>> happens
>>> when the query is changed (say max(c) is removed, and maybe max(d) is
>>> added). I dont know if that fits the classic "schema evolution" feature
>>> or
>>> should that be thought about differently. Would be happy to get some
>>> thoughts.
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Reply via email to