Hi Shahar,

I wasn't sure which schema are you describing that is going to "evolve" (is
it the registered_table? or the output sink?). It will be great if you can
clarify more.

For the example you provided, IMO it is more considered as logic change
instead of schema evolution:
- if you are changing max(c) to max(d) in your query. I don't think this
qualifies as schema evolution.
- if you are adding another column "max(d)" to your query along with your
existing "max(c)" that might be considered as a backward compatible change.
However, either case you will have to restart your logic, you can also
consult how state schema evolution [1], and there are many other problems
that can be tricky as well[2,3].

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-operator-schema-evolution-savepoint-deserialization-fail-td23079.html
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepointing-with-Avro-Schema-change-td19290.html#a19293


On Wed, Mar 6, 2019 at 12:52 PM shkob1 <shahar.kobrin...@gmail.com> wrote:

> Hey,
>
> My job is built on SQL that is injected as an input to the job. so lets
> take
> an example of
>
> Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a
>
> (side note: in order for the state not to grow indefinitely i'm
> transforming
> to a retracted stream and filtering based on a custom trigger)
>
> In order to get the output as a Json format i basically created a way to
> dynamically generate a class and registering it to the class loader, so
> when
> transforming to the retracted stream im doing something like:
>
> Table result = tableEnv.sqlQuery(sqlExpression);
> tableEnv.toRetractStream(result, Row.class, config)
> .filter(tuple -> tuple.f0)
> .map(new RowToDynamicClassMapper(sqlSelectFields))
> .addSink(..)
>
> This actually works pretty good (though i do need to make sure to register
> the dynamic class to the class loader whenever the state is loaded)
>
> Im now looking into "schema evolution" - which basically means what happens
> when the query is changed (say max(c) is removed, and maybe max(d) is
> added). I dont know if that fits the classic "schema evolution" feature or
> should that be thought about differently. Would be happy to get some
> thoughts.
>
> Thanks!
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to