Hey,

My job is built on SQL that is injected as an input to the job. so lets take
an example of 

Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a

(side note: in order for the state not to grow indefinitely i'm transforming
to a retracted stream and filtering based on a custom trigger)

In order to get the output as a Json format i basically created a way to
dynamically generate a class and registering it to the class loader, so when
transforming to the retracted stream im doing something like:

Table result = tableEnv.sqlQuery(sqlExpression);
tableEnv.toRetractStream(result, Row.class, config)
.filter(tuple -> tuple.f0)
.map(new RowToDynamicClassMapper(sqlSelectFields))
.addSink(..)

This actually works pretty good (though i do need to make sure to register
the dynamic class to the class loader whenever the state is loaded)

Im now looking into "schema evolution" - which basically means what happens
when the query is changed (say max(c) is removed, and maybe max(d) is
added). I dont know if that fits the classic "schema evolution" feature or
should that be thought about differently. Would be happy to get some
thoughts.

Thanks!







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to