This is possible but may need some development. There is a similar util in
table tests called
`org.apache.flink.table.expressions.utils.ExpressionTestBase` [1],
it converts/translates expressions (either Table API Expression or SQL
expression) into a MapFunction.

I think you can imitate the way of ExpressionTestBase, to translate into a
FilterFunction.

Best,
Jark

[1]:
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala

On Fri, 5 Jun 2020 at 10:17, Leonard Xu <xbjt...@gmail.com> wrote:

> Hi, Theo
>
> Currently, It’s hard to do this in your DataStream application from my
> understanding, because converting sql expression to Flink operator happens
> in underlying table planner (more precisely in code generate phase) and it
> does not expose interface to user so that you can not assign operator name,
> operator id.
>
> Best,
> Leonard Xu
>
> 在 2020年6月5日,00:18,Theo Diefenthal <theo.diefent...@scoop-software.de> 写道:
>
> Hi there,
>
> I have a full DataStream pipeline (i.e. no upper level APIs like Table or
> SQL are involved). In the mid of the pipeline, I now need to filter on a
> SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include
> REGEXP and arbitrarily nested AND/OR constructs.
>
> I was wondering if I could somehow transform a SQL WHERE expression into a
> Flink FilterFunction?
>
> My approach right now is to register my Stream as a table, run a SQL query
> on it and return back to a DataStream like so:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment();
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
> List<SomePOJO> data = createPOJOTestData();
> DataStream<SomePOJO> stream = env.fromCollection(data);
>
> //final Table asTable = tEnv.fromDataStream(stream);
> //Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO 
> SQL style 'AND' possible here...
>
> tEnv.registerDataStream("SAMPLE", stream);
> Table filteredTable = tEnv.sqlQuery("SELECT * FROM SAMPLE WHERE user = 
> 'pitter' AND age > 10");
>
> stream = tEnv.toAppendStream(filteredTable, SomePOJO.class);
> List<SomePOJO> list = IteratorUtils.toList(DataStreamUtils.collect(stream));
>
> //... test assertions
>
>
> It feels a bit weird that I need to go the full way up to the SQL API with
> registering the table to "just" apply the WHERE clause of a table and that
> I can't assign uid, operator_name to this operator anymore, leaving the
> DataStream world.
>
> Is the way I wrote it the best way to approach or do you have any better
> idea? Are there any caveats here? Not that I didn't assign the event time
> column on purpose as I know that it's just a WHERE without any windowing
> etc and I wanted to test that it still works without any explicit time
> column :)
>
> Best regards
> Theo
>
>
>

Reply via email to