Hi Shahar, I wasn't sure which schema are you describing that is going to "evolve" (is it the registered_table? or the output sink?). It will be great if you can clarify more.
For the example you provided, IMO it is more considered as logic change instead of schema evolution: - if you are changing max(c) to max(d) in your query. I don't think this qualifies as schema evolution. - if you are adding another column "max(d)" to your query along with your existing "max(c)" that might be considered as a backward compatible change. However, either case you will have to restart your logic, you can also consult how state schema evolution [1], and there are many other problems that can be tricky as well[2,3]. Thanks, Rong [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html [2] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-operator-schema-evolution-savepoint-deserialization-fail-td23079.html [3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepointing-with-Avro-Schema-change-td19290.html#a19293 On Wed, Mar 6, 2019 at 12:52 PM shkob1 <shahar.kobrin...@gmail.com> wrote: > Hey, > > My job is built on SQL that is injected as an input to the job. so lets > take > an example of > > Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a > > (side note: in order for the state not to grow indefinitely i'm > transforming > to a retracted stream and filtering based on a custom trigger) > > In order to get the output as a Json format i basically created a way to > dynamically generate a class and registering it to the class loader, so > when > transforming to the retracted stream im doing something like: > > Table result = tableEnv.sqlQuery(sqlExpression); > tableEnv.toRetractStream(result, Row.class, config) > .filter(tuple -> tuple.f0) > .map(new RowToDynamicClassMapper(sqlSelectFields)) > .addSink(..) > > This actually works pretty good (though i do need to make sure to register > the dynamic class to the class loader whenever the state is loaded) > > Im now looking into "schema evolution" - which basically means what happens > when the query is changed (say max(c) is removed, and maybe max(d) is > added). I dont know if that fits the classic "schema evolution" feature or > should that be thought about differently. Would be happy to get some > thoughts. > > Thanks! > > > > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >