Hi, Theo Currently, It’s hard to do this in your DataStream application from my understanding, because converting sql expression to Flink operator happens in underlying table planner (more precisely in code generate phase) and it does not expose interface to user so that you can not assign operator name, operator id.
Best, Leonard Xu > 在 2020年6月5日,00:18,Theo Diefenthal <theo.diefent...@scoop-software.de> 写道: > > Hi there, > > I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL > are involved). In the mid of the pipeline, I now need to filter on a SQL > WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include > REGEXP and arbitrarily nested AND/OR constructs. > > I was wondering if I could somehow transform a SQL WHERE expression into a > Flink FilterFunction? > > My approach right now is to register my Stream as a table, run a SQL query on > it and return back to a DataStream like so: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > env.setParallelism(1); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > > List<SomePOJO> data = createPOJOTestData(); > DataStream<SomePOJO> stream = env.fromCollection(data); > > //final Table asTable = tEnv.fromDataStream(stream); > //Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO > SQL style 'AND' possible here... > > tEnv.registerDataStream("SAMPLE", stream); > Table filteredTable = tEnv.sqlQuery("SELECT * FROM SAMPLE WHERE user = > 'pitter' AND age > 10"); > > stream = tEnv.toAppendStream(filteredTable, SomePOJO.class); > List<SomePOJO> list = IteratorUtils.toList(DataStreamUtils.collect(stream)); > //... test assertions > > It feels a bit weird that I need to go the full way up to the SQL API with > registering the table to "just" apply the WHERE clause of a table and that I > can't assign uid, operator_name to this operator anymore, leaving the > DataStream world. > > Is the way I wrote it the best way to approach or do you have any better > idea? Are there any caveats here? Not that I didn't assign the event time > column on purpose as I know that it's just a WHERE without any windowing etc > and I wanted to test that it still works without any explicit time column :) > > Best regards > Theo >