This is possible but may need some development. There is a similar util in table tests called `org.apache.flink.table.expressions.utils.ExpressionTestBase` [1], it converts/translates expressions (either Table API Expression or SQL expression) into a MapFunction.
I think you can imitate the way of ExpressionTestBase, to translate into a FilterFunction. Best, Jark [1]: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala On Fri, 5 Jun 2020 at 10:17, Leonard Xu <xbjt...@gmail.com> wrote: > Hi, Theo > > Currently, It’s hard to do this in your DataStream application from my > understanding, because converting sql expression to Flink operator happens > in underlying table planner (more precisely in code generate phase) and it > does not expose interface to user so that you can not assign operator name, > operator id. > > Best, > Leonard Xu > > 在 2020年6月5日,00:18,Theo Diefenthal <theo.diefent...@scoop-software.de> 写道: > > Hi there, > > I have a full DataStream pipeline (i.e. no upper level APIs like Table or > SQL are involved). In the mid of the pipeline, I now need to filter on a > SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include > REGEXP and arbitrarily nested AND/OR constructs. > > I was wondering if I could somehow transform a SQL WHERE expression into a > Flink FilterFunction? > > My approach right now is to register my Stream as a table, run a SQL query > on it and return back to a DataStream like so: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > env.setParallelism(1); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > > List<SomePOJO> data = createPOJOTestData(); > DataStream<SomePOJO> stream = env.fromCollection(data); > > //final Table asTable = tEnv.fromDataStream(stream); > //Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO > SQL style 'AND' possible here... > > tEnv.registerDataStream("SAMPLE", stream); > Table filteredTable = tEnv.sqlQuery("SELECT * FROM SAMPLE WHERE user = > 'pitter' AND age > 10"); > > stream = tEnv.toAppendStream(filteredTable, SomePOJO.class); > List<SomePOJO> list = IteratorUtils.toList(DataStreamUtils.collect(stream)); > > //... test assertions > > > It feels a bit weird that I need to go the full way up to the SQL API with > registering the table to "just" apply the WHERE clause of a table and that > I can't assign uid, operator_name to this operator anymore, leaving the > DataStream world. > > Is the way I wrote it the best way to approach or do you have any better > idea? Are there any caveats here? Not that I didn't assign the event time > column on purpose as I know that it's just a WHERE without any windowing > etc and I wanted to test that it still works without any explicit time > column :) > > Best regards > Theo > > >