Huang Xingbo created FLINK-24860:
------------------------------------

             Summary: Fix the wrong position mappings in the Python UDTF
                 Key: FLINK-24860
                 URL: https://issues.apache.org/jira/browse/FLINK-24860
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.13.3, 1.12.5
            Reporter: Huang Xingbo
            Assignee: Huang Xingbo
             Fix For: 1.12.6, 1.13.4


The failed example:
{code:python}
        @udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
        def StoTraceMqSourcePlugUDTF(s: str):
            import json
            try:
                data = json.loads(s)
            except Exception as e:
                return None
            source_code = "trace"
            try:
                shipment_no = data['shipMentNo']
            except Exception as e:
                return None
            yield source_code, shipment_no

        class StoTraceFindNameUDTF(TableFunction):
            def eval(self, shipment_no):
                yield shipment_no, shipment_no

        sto_trace_find_name = udtf(StoTraceFindNameUDTF(),
                                   result_types=[DataTypes.STRING(), 
DataTypes.STRING()])

        # self.env.set_parallelism(1)
        self.t_env.create_temporary_system_function(
            "StoTraceMqSourcePlugUDTF", StoTraceMqSourcePlugUDTF)
        self.t_env.create_temporary_system_function(
            "sto_trace_find_name", sto_trace_find_name
        )
        source_table = self.t_env.from_elements([(
            '{"shipMentNo":"84210186879"}',)],
            ['biz_context'])
        # self.t_env.execute_sql(source_table)
        self.t_env.register_table("source_table", source_table)

        t = self.t_env.sql_query(
            "SELECT biz_context, source_code, shipment_no FROM source_table 
LEFT JOIN LATERAL TABLE(StoTraceMqSourcePlugUDTF(biz_context)) as 
T(source_code, shipment_no)"
            " ON TRUE")
        self.t_env.register_table("Table2", t)
        t = self.t_env.sql_query(
            "SELECT source_code, shipment_no, shipment_name, shipment_type FROM 
Table2 LEFT JOIN LATERAL TABLE(sto_trace_find_name(shipment_no)) as 
T(shipment_name, shipment_type)"
            " ON TRUE"
        )
        print(t.to_pandas())
{code}
In the failed example, the input arguments of the second Python Table Function 
has the wrong positions mapping.




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to