大家好:
    我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
那么在eval方法接收到的就是Row[],
问题出在,Row[]中的数据获取不到,里面的元素都是NULL

通过下面的步骤和代码可还原车祸场景:
kafka topic: test_action
kafka message:
    {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }

代码1:Problem.java
package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 *
 * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
 * 那么在eval方法接收到的就是Row[],
 * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
 *
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
        bsEnv.registerFunction("explode2", new ExplodeFunction());

        String ddlSource = "CREATE TABLE actionTable (\n" +
                "    action ARRAY<\n" +
                "               ROW<" +
                "                   actionID STRING,\n" +
                "                   actionName STRING\n" +
                "                       >\n" +
                "                           >\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = '0.11',\n" +
                "    'connector.topic' = 'test_action',\n" +
                "    'connector.startup-mode' = 'earliest-offset',\n" +
                "    'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
                "    'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
                "    'update-mode' = 'append',\n" +
                "    'format.type' = 'json'\n" +
                ")";
        bsEnv.sqlUpdate(ddlSource);

//        Table table = bsEnv.sqlQuery("select `action` from actionTable");
        Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL
TABLE(explode2(`action`)) as T(`word`)");
        table.printSchema();
        bsEnv.toAppendStream(table, Row.class)
                .print("==tb==");


        bsEnv.execute("ARRAY tableFunction Problem");
    }
}

代码2:ExplodeFunction.java
package com.flink;

import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.Arrays;

public class ExplodeFunction extends TableFunction<Row> {

    public void eval(Row[] values) {
        System.out.println(values.length);
        if (values.length > 0) {
            for (Row row : values) {
                if (row != null) {// 这里debug出来的row总是空
                    ArrayList<Object> list = new ArrayList<>();
                    for (int i = 0; i < row.getArity(); i++) {
                        Object field = row.getField(i);
                        list.add(field);
                    }

collector.collect(Row.of(Arrays.toString(list.toArray())));
                }
            }
        }
    }
}

最后贴个debug的图
[image: image.png]

回复