Hello Flink Community,

We question on how Flink SQL optimizes the SQL statements and whether this can 
be influenced.
For the sake of simplicity, let’s assume we three small tables, one input and 
two output tables.
The input table contains an array which we want

  *   to flatten.
  *   do some (resource expensive) enrichment and pre-filtering.
  *   depending on a filter write the result to one or more output tables.
CREATE TABLE inputTbl (
  requestId STRING NOT NULL,
customers ARRAY<STRING NOT NULL>,
ingestionTime TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR ingestionTime AS ingestionTime
) WITH (
  …
)

CREATE TABLE outputATbl (
  requestId STRING NOT NULL,
customer STRING NOT NULL,
enrichment STRING NOT NULL
) WITH (
  …
)

CREATE TABLE outputBTbl (
  requestId STRING NOT NULL,
customer STRING NOT NULL,
enrichment STRING NOT NULL
) WITH (
  …
)

We have a UDF that returns the enrichment value for a customer and this UDF is 
resource expensive.
CREATE TEMPORARY FUNCTION ENRICH AS ’…'

We create a view to do the flattening and pre-filtering
CREATE VIEW flatView AS
  SELECT
    i.requestId AS requestId,
    t.customer AS customer
  FROM inputTbl AS i
  CROSS JOIN UNNEST(customers) AS t(customer)

We create a second view to do the enrichment and prefiltering
CREATE VIEW enrichmentView AS
  SELECT
    requestId AS requestId,
    customer AS customer,
    ENRICH(customer) as enrichment
  FROM flatView
  WHERE requestId NOT LIKE '%d'
After which we can simply write the data to the output tables using the 
following INSERT statements.
INSERT INTO outputATbl
  SELECT requestId, customer, enrichment
  FROM enrichmentView
  WHERE requestId LIKE '%a'
INSERT INTO outputBTbl
  SELECT requestId, customer, enrichment
  FROM enrichmentView
  WHERE requestId LIKE '%b'
Functionally this works correctly but when you look at the Optimized execution 
Plan, the UNNEST (Correlate) is done for both sinks. Also, the UDF ENRICH() is 
called in every Sink. Both lead to unwanted/unnecessary processing and resource 
usage.

== Optimized Execution Plan ==
Calc(select=[requestId, customers, Reinterpret(CAST(ingestionTime AS 
TIMESTAMP(3) *ROWTIME*)) AS ingestionTime])(reuse_id=[1])
+- TableSourceScan(table=[[ing, sdav2, inputTbl, metadata=[timestamp], 
watermark=[CAST(ingestionTime AS TIMESTAMP(3) *ROWTIME*)], idletimeout=[5], 
watermarkEmitStrategy=[on-periodic]]], fields=[requestId, customers, 
ingestionTime])

Sink(table=[ing.sdav2.outputATbl], fields=[requestId, customer, enrichment])
+- Calc(select=[requestId, f0 AS customer, ENRICH(f0) AS enrichment])
   +- Correlate(invocation=[$UNNEST_ROWS$1($cor2.customers)], 
correlate=[table($UNNEST_ROWS$1($cor2.customers))], 
select=[requestId,customers,ingestionTime,f0], 
rowType=[RecordType(VARCHAR(2147483647) requestId, VARCHAR(2147483647) ARRAY 
customers, TIMESTAMP(3) *ROWTIME* ingestionTime, VARCHAR(2147483647) f0)], 
joinType=[INNER], condition=[AND(LIKE($0, _UTF-16LE'%a'), NOT(LIKE($0, 
_UTF-16LE'%d')))])
      +- Reused(reference_id=[1])

Sink(table=[ing.sdav2.outputBTbl], fields=[requestId, customer, enrichment])
+- Calc(select=[requestId, f0 AS customer, ENRICH(f0) AS enrichment])
   +- Correlate(invocation=[$UNNEST_ROWS$1($cor3.customers)], 
correlate=[table($UNNEST_ROWS$1($cor3.customers))], 
select=[requestId,customers,ingestionTime,f0], 
rowType=[RecordType(VARCHAR(2147483647) requestId, VARCHAR(2147483647) ARRAY 
customers, TIMESTAMP(3) *ROWTIME* ingestionTime, VARCHAR(2147483647) f0)], 
joinType=[INNER], condition=[AND(LIKE($0, _UTF-16LE'%b'), NOT(LIKE($0, 
_UTF-16LE'%d')))])
      +- Reused(reference_id=[1])

I would have expected an Optimized Execution Plan more like the following:

== Expected Optimized Execution Plan ==
== Optimized Execution Plan ==
Calc(select=[requestId, f0 AS customer, ENRICH(f0) AS enrichment])(reuse_id=[1])
+- Correlate(invocation=[$UNNEST_ROWS$1($cor2.customers)], 
correlate=[table($UNNEST_ROWS$1($cor2.customers))], 
select=[requestId,customers,ingestionTime,f0], 
rowType=[RecordType(VARCHAR(2147483647) requestId, VARCHAR(2147483647) ARRAY 
customers, TIMESTAMP(3) *ROWTIME* ingestionTime, VARCHAR(2147483647) f0)], 
joinType=[INNER])
   +- Calc(select=[requestId, customers, Reinterpret(CAST(ingestionTime AS 
TIMESTAMP(3) *ROWTIME*)) AS ingestionTime], where=[NOT(LIKE(requestId, '%d'))])
      +- TableSourceScan(table=[[ing, sdav2, inputTbl, metadata=[timestamp], 
watermark=[CAST(ingestionTime AS TIMESTAMP(3) *ROWTIME*)], idletimeout=[5], 
watermarkEmitStrategy=[on-periodic]]], fields=[requestId, customers, 
ingestionTime])

Sink(table=[ing.sdav2.outputATbl], fields=[requestId, customer, enrichment])
+- Calc(select=[requestId, customer, enrichment], where=[LIKE(requestId, '%a')])
   +- Reused(reference_id=[1])

Sink(table=[ing.sdav2.outputBTbl], fields=[requestId, customer, enrichment])
+- Calc(select=[requestId, customer, enrichment], where=[LIKE(requestId, '%b')])
   +- Reused(reference_id=[1])

Is there a way to influence the optimizer so that we can mitigate the 
unwanted/unnecessary processing and resource usage?
Or is there another design pattern we should use to solve this issue?

Regards,
Fred Teunissen


-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended 
recipient. If you are not the intended recipient, don't use or disclose it in 
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------

Reply via email to