Hi there,
I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL
are involved). In the mid of the pipeline, I now need to filter on a SQL WHERE
expression, e.g. "user= 'pitter' AND age > 10", which can include REGEXP and
arbitrarily nested AND/OR constructs.
I was wondering if I could somehow transform a SQL WHERE expression into a
Flink FilterFunction?
My approach right now is to register my Stream as a table, run a SQL query on
it and return back to a DataStream like so:
StreamExecutionEnvironment env = StreamExecutionEnvironment.
createLocalEnvironment ();
env.setParallelism( 1 );
env.setStreamTimeCharacteristic(TimeCharacteristic. EventTime );
StreamTableEnvironment tEnv = StreamTableEnvironment. create (env);
List<SomePOJO> data = createPOJOTestData ();
DataStream<SomePOJO> stream = env.fromCollection(data);
//final Table asTable = tEnv.fromDataStream(stream);
//Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO
SQL style 'AND' possible here...
tEnv.registerDataStream( "SAMPLE" , stream);
Table filteredTable = tEnv.sqlQuery( "SELECT * FROM SAMPLE WHERE user =
'pitter' AND age > 10" );
stream = tEnv.toAppendStream(filteredTable, SomePOJO. class );
List<SomePOJO> list = IteratorUtils. toList (DataStreamUtils. collect
(stream));
//... test assertions
It feels a bit weird that I need to go the full way up to the SQL API with
registering the table to "just" apply the WHERE clause of a table and that I
can't assign uid, operator_name to this operator anymore, leaving the
DataStream world.
Is the way I wrote it the best way to approach or do you have any better idea?
Are there any caveats here? Not that I didn't assign the event time column on
purpose as I know that it's just a WHERE without any windowing etc and I wanted
to test that it still works without any explicit time column :)
Best regards
Theo