[ https://issues.apache.org/jira/browse/FLINK-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17081768#comment-17081768 ]
Hequn Cheng edited comment on FLINK-17093 at 4/15/20, 8:42 AM: --------------------------------------------------------------- Resolved in 1.10.1 via 6f8d26d389022e7d901f69524e2e912b9c7e19ca in 1.11.0 via 48bf9fbd9a79416ce28d76291f23861cb2d38941 was (Author: hequn8128): Resolved in 1.10.1 via 6f8d26d389022e7d901f69524e2e912b9c7e19ca > Python UDF doesn't work when the input column is from composite field > --------------------------------------------------------------------- > > Key: FLINK-17093 > URL: https://issues.apache.org/jira/browse/FLINK-17093 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.10.0 > Reporter: Dian Fu > Assignee: Dian Fu > Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.1, 1.11.0 > > Time Spent: 40m > Remaining Estimate: 0h > > For the following job: > {code} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import BatchTableEnvironment, StreamTableEnvironment, > EnvironmentSettings, CsvTableSink > from pyflink.table.descriptors import Schema, Kafka, Json > from pyflink.table import DataTypes > from pyflink.table.udf import ScalarFunction, udf > import os > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), > DataTypes.STRING()], > result_type=DataTypes.STRING()) > def get_host_ip(source, qr, sip, dip): > if source == "NGAF" and qr == '1': > return dip > return sip > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), > DataTypes.STRING()], > result_type=DataTypes.STRING()) > def get_dns_server_ip(source, qr, sip, dip): > if source == "NGAF" and qr == '1': > return sip > return dip > def test_case(): > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > t_env = StreamTableEnvironment.create(env) > from pyflink.table import Row > table = t_env.from_elements( > [("DNS", Row(source="source", devid="devid", sip="sip", dip="dip", > qr="qr", queries="queries", answers="answers", qtypes="qtypes", > atypes="atypes", rcode="rcode", ts="ts",))], > DataTypes.ROW([DataTypes.FIELD("stype", DataTypes.STRING()), > DataTypes.FIELD("data", > DataTypes.ROW([DataTypes.FIELD('source', DataTypes.STRING()), > DataTypes.FIELD("devid", DataTypes.STRING()), > DataTypes.FIELD('sip', DataTypes.STRING()), > DataTypes.FIELD('dip', DataTypes.STRING()), > DataTypes.FIELD("qr", DataTypes.STRING()), > DataTypes.FIELD("queries", DataTypes.STRING()), > DataTypes.FIELD("answers", DataTypes.STRING()), > DataTypes.FIELD("qtypes", DataTypes.STRING()), > DataTypes.FIELD("atypes", DataTypes.STRING()), > DataTypes.FIELD("rcode", DataTypes.STRING()), > DataTypes.FIELD("ts", DataTypes.STRING())])) > ] > )) > result_file = "/tmp/test.csv" > if os.path.exists(result_file): > os.remove(result_file) > t_env.register_table_sink("Results", > CsvTableSink(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', > 'm', 'n'], > [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), > DataTypes.STRING(), > DataTypes.STRING(), > DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), > DataTypes.STRING(), > DataTypes.STRING(), DataTypes.STRING(), > DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], > "/tmp/test.csv")) > t_env.register_function("get_host_ip", get_host_ip) > t_env.register_function("get_dns_server_ip", get_dns_server_ip) > t_env.register_table("source", table) > standard_table = t_env.sql_query("select data.*, stype as dns_type from > source")\ > .where("dns_type.in('DNSFULL', 'DNS', 'DNSFULL_FROM_LOG', 'DNS_FROM_LOG')") > t_env.register_table("standard_table", standard_table) > final_table = t_env.sql_query("SELECT *, get_host_ip(source, qr, sip, dip) > as host_ip," > "get_dns_server_ip(source, qr, sip, dip) as dns_server_ip FROM > standard_table") > final_table.insert_into("Results") > t_env.execute("test") > if __name__ == '__main__': > test_case() > {code} > The plan is as following which is not correct: > {code} > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > KafkaTableSource(type, data) -> Map -> where: (IN(type, _UTF-16LE'DNSFULL', > _UTF-16LE'DNS', _UTF-16LE'DNSFULL_FROM_LOG', _UTF-16LE'DNS_FROM_LOG')), > select: (data, type) -> select: (type, get_host_ip(type.source, type.qr, > type.sip, type.dip) AS f0, get_dns_server_ip(type.source, type.qr, type.sip, > type.dip) AS f1) -> select: (f0.source AS source, f0.devid AS devid, f0.sip > AS sip, f0.dip AS dip, f0.qr AS qr, f0.queries AS queries, f0.answers AS > answers, f0.qtypes AS qtypes, f0.atypes AS atypes, f0.rcode AS rcode, f0.ts > AS ts, type AS dns_type, f0 AS host_ip, f1 AS dns_server_ip) -> to: Row -> > Sink: KafkaTableSink(source, devid, sip, dip, qr, queries, answers, qtypes, > atypes, rcode, ts, dns_type, host_ip, dns_server_ip) (1/4) > (8d064ab137866a2a9040392a87bcc59d) switched from RUNNING to FAILED. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)