Hey, My job is built on SQL that is injected as an input to the job. so lets take an example of
Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a (side note: in order for the state not to grow indefinitely i'm transforming to a retracted stream and filtering based on a custom trigger) In order to get the output as a Json format i basically created a way to dynamically generate a class and registering it to the class loader, so when transforming to the retracted stream im doing something like: Table result = tableEnv.sqlQuery(sqlExpression); tableEnv.toRetractStream(result, Row.class, config) .filter(tuple -> tuple.f0) .map(new RowToDynamicClassMapper(sqlSelectFields)) .addSink(..) This actually works pretty good (though i do need to make sure to register the dynamic class to the class loader whenever the state is loaded) Im now looking into "schema evolution" - which basically means what happens when the query is changed (say max(c) is removed, and maybe max(d) is added). I dont know if that fits the classic "schema evolution" feature or should that be thought about differently. Would be happy to get some thoughts. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/