Hi Guys,

The Flink version is 1.9.0. I use OrcTableSource to read ORC file in HDFS and 
the job is executed successfully, no any exception or error. But some 
fields(such as tagIndustry) are always null, actually these fields are not 
null. I can read these fields by direct reading it. Below is my code:


//main
         final ParameterTool params = ParameterTool.fromArgs(args);


        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();


        env.getConfig().setGlobalJobParameters(params);


        Configuration config = new Configuration();




        OrcTableSource orcTableSource = OrcTableSource
            .builder()
            .path(params.get("input"))
            .forOrcSchema(TypeDescription.fromString(TYPE_INFO))
            .withConfiguration(config)
            .build();


        DataSet<Row> dataSet = orcTableSource.getDataSet(env);


        DataSet<Tuple2<String, Integer>> counts = dataSet.flatMap(new 
Tokenizer()).groupBy(0).sum(1);


//read field
public void flatMap(Row row, Collector<Tuple2<String, Integer>> out) {


            String content = ((String) row.getField(6));
            String tagIndustry = ((String) row.getField(35));


                LOGGER.info("arity: " + row.getArity());
                LOGGER.info("content: " + content);
                LOGGER.info("tagIndustry: " + tagIndustry);
                LOGGER.info("===========================");


            if (Strings.isNullOrEmpty(content) || 
Strings.isNullOrEmpty(tagIndustry) || !tagIndustry.contains("FD001")) {
                return;
            }
            // normalize and split the line
            String[] tokens = content.toLowerCase().split("\\W+");


            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }


Thanks for your help!


QiShu

Reply via email to