Hello Flink Community, We question on how Flink SQL optimizes the SQL statements and whether this can be influenced. For the sake of simplicity, let’s assume we three small tables, one input and two output tables. The input table contains an array which we want
* to flatten. * do some (resource expensive) enrichment and pre-filtering. * depending on a filter write the result to one or more output tables. CREATE TABLE inputTbl ( requestId STRING NOT NULL, customers ARRAY<STRING NOT NULL>, ingestionTime TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK FOR ingestionTime AS ingestionTime ) WITH ( … ) CREATE TABLE outputATbl ( requestId STRING NOT NULL, customer STRING NOT NULL, enrichment STRING NOT NULL ) WITH ( … ) CREATE TABLE outputBTbl ( requestId STRING NOT NULL, customer STRING NOT NULL, enrichment STRING NOT NULL ) WITH ( … ) We have a UDF that returns the enrichment value for a customer and this UDF is resource expensive. CREATE TEMPORARY FUNCTION ENRICH AS ’…' We create a view to do the flattening and pre-filtering CREATE VIEW flatView AS SELECT i.requestId AS requestId, t.customer AS customer FROM inputTbl AS i CROSS JOIN UNNEST(customers) AS t(customer) We create a second view to do the enrichment and prefiltering CREATE VIEW enrichmentView AS SELECT requestId AS requestId, customer AS customer, ENRICH(customer) as enrichment FROM flatView WHERE requestId NOT LIKE '%d' After which we can simply write the data to the output tables using the following INSERT statements. INSERT INTO outputATbl SELECT requestId, customer, enrichment FROM enrichmentView WHERE requestId LIKE '%a' INSERT INTO outputBTbl SELECT requestId, customer, enrichment FROM enrichmentView WHERE requestId LIKE '%b' Functionally this works correctly but when you look at the Optimized execution Plan, the UNNEST (Correlate) is done for both sinks. Also, the UDF ENRICH() is called in every Sink. Both lead to unwanted/unnecessary processing and resource usage. == Optimized Execution Plan == Calc(select=[requestId, customers, Reinterpret(CAST(ingestionTime AS TIMESTAMP(3) *ROWTIME*)) AS ingestionTime])(reuse_id=[1]) +- TableSourceScan(table=[[ing, sdav2, inputTbl, metadata=[timestamp], watermark=[CAST(ingestionTime AS TIMESTAMP(3) *ROWTIME*)], idletimeout=[5], watermarkEmitStrategy=[on-periodic]]], fields=[requestId, customers, ingestionTime]) Sink(table=[ing.sdav2.outputATbl], fields=[requestId, customer, enrichment]) +- Calc(select=[requestId, f0 AS customer, ENRICH(f0) AS enrichment]) +- Correlate(invocation=[$UNNEST_ROWS$1($cor2.customers)], correlate=[table($UNNEST_ROWS$1($cor2.customers))], select=[requestId,customers,ingestionTime,f0], rowType=[RecordType(VARCHAR(2147483647) requestId, VARCHAR(2147483647) ARRAY customers, TIMESTAMP(3) *ROWTIME* ingestionTime, VARCHAR(2147483647) f0)], joinType=[INNER], condition=[AND(LIKE($0, _UTF-16LE'%a'), NOT(LIKE($0, _UTF-16LE'%d')))]) +- Reused(reference_id=[1]) Sink(table=[ing.sdav2.outputBTbl], fields=[requestId, customer, enrichment]) +- Calc(select=[requestId, f0 AS customer, ENRICH(f0) AS enrichment]) +- Correlate(invocation=[$UNNEST_ROWS$1($cor3.customers)], correlate=[table($UNNEST_ROWS$1($cor3.customers))], select=[requestId,customers,ingestionTime,f0], rowType=[RecordType(VARCHAR(2147483647) requestId, VARCHAR(2147483647) ARRAY customers, TIMESTAMP(3) *ROWTIME* ingestionTime, VARCHAR(2147483647) f0)], joinType=[INNER], condition=[AND(LIKE($0, _UTF-16LE'%b'), NOT(LIKE($0, _UTF-16LE'%d')))]) +- Reused(reference_id=[1]) I would have expected an Optimized Execution Plan more like the following: == Expected Optimized Execution Plan == == Optimized Execution Plan == Calc(select=[requestId, f0 AS customer, ENRICH(f0) AS enrichment])(reuse_id=[1]) +- Correlate(invocation=[$UNNEST_ROWS$1($cor2.customers)], correlate=[table($UNNEST_ROWS$1($cor2.customers))], select=[requestId,customers,ingestionTime,f0], rowType=[RecordType(VARCHAR(2147483647) requestId, VARCHAR(2147483647) ARRAY customers, TIMESTAMP(3) *ROWTIME* ingestionTime, VARCHAR(2147483647) f0)], joinType=[INNER]) +- Calc(select=[requestId, customers, Reinterpret(CAST(ingestionTime AS TIMESTAMP(3) *ROWTIME*)) AS ingestionTime], where=[NOT(LIKE(requestId, '%d'))]) +- TableSourceScan(table=[[ing, sdav2, inputTbl, metadata=[timestamp], watermark=[CAST(ingestionTime AS TIMESTAMP(3) *ROWTIME*)], idletimeout=[5], watermarkEmitStrategy=[on-periodic]]], fields=[requestId, customers, ingestionTime]) Sink(table=[ing.sdav2.outputATbl], fields=[requestId, customer, enrichment]) +- Calc(select=[requestId, customer, enrichment], where=[LIKE(requestId, '%a')]) +- Reused(reference_id=[1]) Sink(table=[ing.sdav2.outputBTbl], fields=[requestId, customer, enrichment]) +- Calc(select=[requestId, customer, enrichment], where=[LIKE(requestId, '%b')]) +- Reused(reference_id=[1]) Is there a way to influence the optimizer so that we can mitigate the unwanted/unnecessary processing and resource usage? Or is there another design pattern we should use to solve this issue? Regards, Fred Teunissen ----------------------------------------------------------------- ATTENTION: The information in this e-mail is confidential and only meant for the intended recipient. If you are not the intended recipient, don't use or disclose it in any way. Please let the sender know and delete the message immediately. -----------------------------------------------------------------