[ 
https://issues.apache.org/jira/browse/FLINK-38871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal Kamlapure updated FLINK-38871:
-------------------------------------
    Description: 
I have identified a correctness/performance bug in the Flink Table Planner 
regarding the optimization of {{ROW}} fields returned by Python UDFs.

When a Python UDF returns a {{ROW}} type and a field from that {{ROW}} is 
materialized into a top-level column (using {{{}add_or_replace_columns{}}}) and 
subsequently filtered, the planner aggressively applies {*}Constant 
Propagation{*}.

If this column is used as input for a _second_ downstream Python UDF, the 
planner replaces the actual column reference with the constant literal derived 
from the filter (e.g., rewriting {{amountId}} to {{{}'0'{}}}). This causes the 
downstream UDF to execute on rows that {*}do not satisfy the filter 
predicate{*}, receiving the constant value instead of the actual data.

This behavior breaks filter semantics and corrupts intermediate data observed 
by UDFs. Notably, this issue *does not occur* when the upstream UDF returns a 
{{MAP}} type, suggesting the issue is specific to {{ROW}} field optimization 
rules.
h3. Reproduction Script (PyFlink)

The following script reproduces the issue. It generates a batch with 
{{amountId}} values {{"0"}} and {{{}"1"{}}}. We filter for {{{}"0"{}}}, but the 
downstream UDF prints {{"0"}} for *all* rows, proving that the value {{"1"}} 
was overwritten by the planner.
{code:python}
from pyflink.table import (
    EnvironmentSettings,
    TableEnvironment,
    DataTypes
)
from pyflink.table.udf import udf
from pyflink.table.expressions import col
import pandas as pd

# ------------------------------------------------------------------------------
# 1. Table environment
# ------------------------------------------------------------------------------

env_settings = EnvironmentSettings.in_batch_mode()
t_env = TableEnvironment.create(env_settings)

# ------------------------------------------------------------------------------
# 2. Input table (NO connector, pure Python)
# ------------------------------------------------------------------------------

t = t_env.from_elements(
    [
        ("A", "0"),   # credit
        ("B", "1"),   # debit  <-- should be filtered out
        ("C", "0"),   # credit
    ],
    DataTypes.ROW([
        DataTypes.FIELD("Text", DataTypes.STRING()),
        DataTypes.FIELD("amount", DataTypes.STRING())
    ])
)

# ------------------------------------------------------------------------------
# 3. FIRST Python UDF (returns ROW)
# ------------------------------------------------------------------------------

@udf(
    result_type=DataTypes.ROW([
        DataTypes.FIELD("out_text", DataTypes.STRING()),
        DataTypes.FIELD("out_amount", DataTypes.STRING())
    ]),
    func_type="pandas"
)
def validate_udf(text: pd.Series, amount: pd.Series) -> pd.DataFrame:
    return pd.DataFrame({
        "out_text": text,
        "out_amount": amount
    })

# ------------------------------------------------------------------------------
# 4. SECOND Python UDF (just echoes input, used to show leak)
# ------------------------------------------------------------------------------

@udf(
    result_type=DataTypes.ROW([
        DataTypes.FIELD("seen_amount", DataTypes.STRING())
    ]),
    func_type="pandas"
)
def second_udf(amount: pd.Series) -> pd.DataFrame:
    print("\n[second_udf] received values:")
    print(amount.tolist())
    return pd.DataFrame({
        "seen_amount": amount
    })

# ------------------------------------------------------------------------------
# 5. Pipeline that TRIGGERS the bug
# ------------------------------------------------------------------------------

# Step 1: apply first Python UDF (ROW output)
validated = t.add_columns(
    validate_udf(t.Text, t.amount).alias("v")
)

# Step 2: materialize ROW fields back into base columns  <<< CRITICAL
materialized = validated.add_or_replace_columns(
    col("v").out_text.alias("Text"),
    col("v").out_amount.alias("amount")
)

# Step 3: filter (should keep only amount == "0")
filtered = materialized.filter(col("amount") == "0")

# Step 4: pass filtered column into SECOND Python UDF
final = filtered.add_columns(
    second_udf(filtered.amount).alias("s")
).select(
    col("Text"),
    col("amount"),
    col("s").seen_amount
)

# ------------------------------------------------------------------------------
# 6. Observe execution plan and output
# ------------------------------------------------------------------------------

print("\n===== EXECUTION PLAN =====")
print(final.explain())

print("\n===== FINAL OUTPUT =====")
final.execute().print()
{code}
h3. Observed Behavior

The logs from {{second_udf}} show that it receives the value {{'0'}} for the 
second row, even though that row actually contains {{{}'1'{}}}.
 
[second_udf] received batch: ['0', '0', '0'] 
_(Note: The batch size is 3, meaning the row with ID '1' was not dropped, but 
its value was overwritten to '0'.)_

The execution plan reveals that the planner created a {{Calc}} node that 
forcefully casts the column to a constant before calling the Python UDF:
Plaintext
 
Calc(select=[..., CAST('0' AS VARCHAR) AS amountId, ...])
h3. Expected Behavior
 # The downstream UDF should only process rows that satisfy the filter (or if 
execution pipelining allows processing, it must see the *original* values).

 # The planner should not rewrite input columns to constants based on 
downstream filters if those columns are inputs to Python UDFs.

h3. Analysis & Workaround

The issue appears to be an unsafe application of constant folding on {{ROW}} 
fields materialized from {{PythonCalc}} outputs.
 * *ROW Type (Buggy):* The planner views the {{ROW}} fields as transparent and 
applies constant propagation ({{{}amountId = '0'{}}}) _before_ the filter is 
physically enforced, and passes this constant to the next {{{}PythonCalc{}}}.

 * *MAP Type (Workaround):* Changing {{validate_udf}} to return {{MAP<STRING, 
STRING>}} fixes the issue. The planner treats the map access {{ITEM(map, 
'key')}} as opaque, preventing constant propagation and forcing the correct 
execution order (UDF -> Filter -> UDF).

  was:
I have identified a correctness/performance bug in the Flink Table Planner 
regarding the optimization of {{ROW}} fields returned by Python UDFs.

When a Python UDF returns a {{ROW}} type and a field from that {{ROW}} is 
materialized into a top-level column (using {{{}add_or_replace_columns{}}}) and 
subsequently filtered, the planner aggressively applies {*}Constant 
Propagation{*}.

If this column is used as input for a _second_ downstream Python UDF, the 
planner replaces the actual column reference with the constant literal derived 
from the filter (e.g., rewriting {{amountId}} to {{{}'0'{}}}). This causes the 
downstream UDF to execute on rows that {*}do not satisfy the filter 
predicate{*}, receiving the constant value instead of the actual data.

This behavior breaks filter semantics and corrupts intermediate data observed 
by UDFs. Notably, this issue *does not occur* when the upstream UDF returns a 
{{MAP}} type, suggesting the issue is specific to {{ROW}} field optimization 
rules.
h3. Reproduction Script (PyFlink)

The following script reproduces the issue. It generates a batch with 
{{amountId}} values {{"0"}} and {{{}"1"{}}}. We filter for {{{}"0"{}}}, but the 
downstream UDF prints {{"0"}} for *all* rows, proving that the value {{"1"}} 
was overwritten by the planner.
{code:python}
from pyflink.table import EnvironmentSettings, TableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table.expressions import col
import pandas as pd

def reproducer():
    env_settings = EnvironmentSettings.in_batch_mode()
    t_env = TableEnvironment.create(env_settings)

    # 1. Source Data: Row 'B' has amountId="1" (Should be filtered out)
    t = t_env.from_elements(
        [
            ("A", "0"),
            ("B", "1"), 
            ("C", "0"),
        ],
        DataTypes.ROW([
            DataTypes.FIELD("transactionText", DataTypes.STRING()),
            DataTypes.FIELD("amountId", DataTypes.STRING())
        ])
    )

    # 2. First UDF: Returns a ROW
    @udf(result_type=DataTypes.ROW([
            DataTypes.FIELD("out_text", DataTypes.STRING()),
            DataTypes.FIELD("out_amountId", DataTypes.STRING())
        ]), func_type="pandas")
    def validate_udf(text: pd.Series, amountId: pd.Series) -> pd.DataFrame:
        return pd.DataFrame({
            "out_text": text,
            "out_amountId": amountId
        })

    # 3. Second UDF: Inspects the value it receives
    @udf(result_type=DataTypes.ROW([
            DataTypes.FIELD("seen_amountId", DataTypes.STRING())
        ]), func_type="pandas")
    def second_udf(amountId: pd.Series) -> pd.DataFrame:
        # DEBUG: Print exactly what the UDF sees
        print("\n[second_udf] received batch:", amountId.tolist())
        return pd.DataFrame({
            "seen_amountId": amountId
        })

    # 4. Pipeline Construction
    validated = t.add_columns(validate_udf(t.transactionText, 
t.amountId).alias("v"))

    # Materialize ROW fields to top-level columns
    materialized = validated.add_or_replace_columns(
        col("v").out_text.alias("transactionText"),
        col("v").out_amountId.alias("amountId")
    )

    # Filter: We only want amountId == "0"
    filtered = materialized.filter(col("amountId") == "0")

    # Apply downstream UDF on the filtered data
    final = filtered.add_columns(
        second_udf(filtered.amountId).alias("s")
    ).select(
        col("transactionText"),
        col("amountId"),
        col("s").seen_amountId
    )

    print("=== PLAN ===")
    print(final.explain())

    print("=== EXECUTION ===")
    final.execute().print()

if __name__ == '__main__':
    reproducer()
{code}
h3. Observed Behavior

The logs from {{second_udf}} show that it receives the value {{'0'}} for the 
second row, even though that row actually contains {{{}'1'{}}}.
 
 [second_udf] received batch: ['0', '0', '0'] 
_(Note: The batch size is 3, meaning the row with ID '1' was not dropped, but 
its value was overwritten to '0'.)_

The execution plan reveals that the planner created a {{Calc}} node that 
forcefully casts the column to a constant before calling the Python UDF:
Plaintext
 
 Calc(select=[..., CAST('0' AS VARCHAR) AS amountId, ...])
h3. Expected Behavior
 # The downstream UDF should only process rows that satisfy the filter (or if 
execution pipelining allows processing, it must see the *original* values).

 # The planner should not rewrite input columns to constants based on 
downstream filters if those columns are inputs to Python UDFs.

h3. Analysis & Workaround

The issue appears to be an unsafe application of constant folding on {{ROW}} 
fields materialized from {{PythonCalc}} outputs.
 * *ROW Type (Buggy):* The planner views the {{ROW}} fields as transparent and 
applies constant propagation ({{{}amountId = '0'{}}}) _before_ the filter is 
physically enforced, and passes this constant to the next {{{}PythonCalc{}}}.

 * *MAP Type (Workaround):* Changing {{validate_udf}} to return {{MAP<STRING, 
STRING>}} fixes the issue. The planner treats the map access {{ITEM(map, 
'key')}} as opaque, preventing constant propagation and forcing the correct 
execution order (UDF -> Filter -> UDF).


> PyFlink Planner incorrectly propagates constants into downstream UDF inputs 
> when filtering on materialized ROW fields
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38871
>                 URL: https://issues.apache.org/jira/browse/FLINK-38871
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python, Table SQL / Planner
>    Affects Versions: 1.19.1, 2.2.0
>         Environment: *Python :-* 3.11.14
> *PyFlink :-* 1.19.1 & 2.2.0
> *Java :-* 11
>            Reporter: Vishal Kamlapure
>            Priority: Major
>         Attachments: Screenshot 2026-01-08 at 2.50.40 AM.png
>
>
> I have identified a correctness/performance bug in the Flink Table Planner 
> regarding the optimization of {{ROW}} fields returned by Python UDFs.
> When a Python UDF returns a {{ROW}} type and a field from that {{ROW}} is 
> materialized into a top-level column (using {{{}add_or_replace_columns{}}}) 
> and subsequently filtered, the planner aggressively applies {*}Constant 
> Propagation{*}.
> If this column is used as input for a _second_ downstream Python UDF, the 
> planner replaces the actual column reference with the constant literal 
> derived from the filter (e.g., rewriting {{amountId}} to {{{}'0'{}}}). This 
> causes the downstream UDF to execute on rows that {*}do not satisfy the 
> filter predicate{*}, receiving the constant value instead of the actual data.
> This behavior breaks filter semantics and corrupts intermediate data observed 
> by UDFs. Notably, this issue *does not occur* when the upstream UDF returns a 
> {{MAP}} type, suggesting the issue is specific to {{ROW}} field optimization 
> rules.
> h3. Reproduction Script (PyFlink)
> The following script reproduces the issue. It generates a batch with 
> {{amountId}} values {{"0"}} and {{{}"1"{}}}. We filter for {{{}"0"{}}}, but 
> the downstream UDF prints {{"0"}} for *all* rows, proving that the value 
> {{"1"}} was overwritten by the planner.
> {code:python}
> from pyflink.table import (
>     EnvironmentSettings,
>     TableEnvironment,
>     DataTypes
> )
> from pyflink.table.udf import udf
> from pyflink.table.expressions import col
> import pandas as pd
> # 
> ------------------------------------------------------------------------------
> # 1. Table environment
> # 
> ------------------------------------------------------------------------------
> env_settings = EnvironmentSettings.in_batch_mode()
> t_env = TableEnvironment.create(env_settings)
> # 
> ------------------------------------------------------------------------------
> # 2. Input table (NO connector, pure Python)
> # 
> ------------------------------------------------------------------------------
> t = t_env.from_elements(
>     [
>         ("A", "0"),   # credit
>         ("B", "1"),   # debit  <-- should be filtered out
>         ("C", "0"),   # credit
>     ],
>     DataTypes.ROW([
>         DataTypes.FIELD("Text", DataTypes.STRING()),
>         DataTypes.FIELD("amount", DataTypes.STRING())
>     ])
> )
> # 
> ------------------------------------------------------------------------------
> # 3. FIRST Python UDF (returns ROW)
> # 
> ------------------------------------------------------------------------------
> @udf(
>     result_type=DataTypes.ROW([
>         DataTypes.FIELD("out_text", DataTypes.STRING()),
>         DataTypes.FIELD("out_amount", DataTypes.STRING())
>     ]),
>     func_type="pandas"
> )
> def validate_udf(text: pd.Series, amount: pd.Series) -> pd.DataFrame:
>     return pd.DataFrame({
>         "out_text": text,
>         "out_amount": amount
>     })
> # 
> ------------------------------------------------------------------------------
> # 4. SECOND Python UDF (just echoes input, used to show leak)
> # 
> ------------------------------------------------------------------------------
> @udf(
>     result_type=DataTypes.ROW([
>         DataTypes.FIELD("seen_amount", DataTypes.STRING())
>     ]),
>     func_type="pandas"
> )
> def second_udf(amount: pd.Series) -> pd.DataFrame:
>     print("\n[second_udf] received values:")
>     print(amount.tolist())
>     return pd.DataFrame({
>         "seen_amount": amount
>     })
> # 
> ------------------------------------------------------------------------------
> # 5. Pipeline that TRIGGERS the bug
> # 
> ------------------------------------------------------------------------------
> # Step 1: apply first Python UDF (ROW output)
> validated = t.add_columns(
>     validate_udf(t.Text, t.amount).alias("v")
> )
> # Step 2: materialize ROW fields back into base columns  <<< CRITICAL
> materialized = validated.add_or_replace_columns(
>     col("v").out_text.alias("Text"),
>     col("v").out_amount.alias("amount")
> )
> # Step 3: filter (should keep only amount == "0")
> filtered = materialized.filter(col("amount") == "0")
> # Step 4: pass filtered column into SECOND Python UDF
> final = filtered.add_columns(
>     second_udf(filtered.amount).alias("s")
> ).select(
>     col("Text"),
>     col("amount"),
>     col("s").seen_amount
> )
> # 
> ------------------------------------------------------------------------------
> # 6. Observe execution plan and output
> # 
> ------------------------------------------------------------------------------
> print("\n===== EXECUTION PLAN =====")
> print(final.explain())
> print("\n===== FINAL OUTPUT =====")
> final.execute().print()
> {code}
> h3. Observed Behavior
> The logs from {{second_udf}} show that it receives the value {{'0'}} for the 
> second row, even though that row actually contains {{{}'1'{}}}.
>  
> [second_udf] received batch: ['0', '0', '0'] 
> _(Note: The batch size is 3, meaning the row with ID '1' was not dropped, but 
> its value was overwritten to '0'.)_
> The execution plan reveals that the planner created a {{Calc}} node that 
> forcefully casts the column to a constant before calling the Python UDF:
> Plaintext
>  
> Calc(select=[..., CAST('0' AS VARCHAR) AS amountId, ...])
> h3. Expected Behavior
>  # The downstream UDF should only process rows that satisfy the filter (or if 
> execution pipelining allows processing, it must see the *original* values).
>  # The planner should not rewrite input columns to constants based on 
> downstream filters if those columns are inputs to Python UDFs.
> h3. Analysis & Workaround
> The issue appears to be an unsafe application of constant folding on {{ROW}} 
> fields materialized from {{PythonCalc}} outputs.
>  * *ROW Type (Buggy):* The planner views the {{ROW}} fields as transparent 
> and applies constant propagation ({{{}amountId = '0'{}}}) _before_ the filter 
> is physically enforced, and passes this constant to the next 
> {{{}PythonCalc{}}}.
>  * *MAP Type (Workaround):* Changing {{validate_udf}} to return {{MAP<STRING, 
> STRING>}} fixes the issue. The planner treats the map access {{ITEM(map, 
> 'key')}} as opaque, preventing constant propagation and forcing the correct 
> execution order (UDF -> Filter -> UDF).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to