????flink??????????????????HashMap<String,Metadata&gt; 
????????????????????????java????
??flink????????
SingleOutputStreamOperator<Row&gt; returns = tenv.toRetractStream(table, 
Row.class).map(new RichMapFunction<Tuple2<Boolean, Row&gt;, Row&gt;() {
    HashMap<String, Metadata&gt; metadataHashMap1 =metadataHashMap;
    @Override
    public Row map(Tuple2<Boolean, Row&gt; value) throws Exception {
        String[] split = finalColumn.split(",");
        for (int i = 0; i < split.length; i++) {
            if (value.f1.getField(i) == null) continue;
            Object fieldValue = value.f1.getField(i);
            Metadata metadata = metadataHashMap.get(fieldValue);
            value.f1.setField(i, metadata.getTag_value());
        }
        return value.f1;
    }
}).returns(rowTypeInfo);
????????????
Metadata metadata = metadataHashMap.get(fieldValue);??null??????????????

Flink??????1.12.5

??????????????Flink??1.14.0??????????????????????????toRetractStream????toDataStream??????
SingleOutputStreamOperator<Row&gt; returns = tenv.toDataStream(table, 
Row.class).map(new MapFunction<Row, Row&gt;() {
    @Override
    public Row map(Row value) throws Exception {
        String[] split = finalColumn.split(",");
        for (int i = 0; i < split.length; i++) {
            if (value.getField(i) == null) continue;
            Object fieldValue = value.getField(i);
            Metadata metadata = metadataHashMap.get(fieldValue);
            value.setField(i, metadata.getTag_value());
        }
        return value;
    }
})
        .filter(new FilterFunction<Row&gt;() {
    @Override
    public boolean filter(Row value) throws Exception {
        if (value.getField("_id") == null){
            return false;
        }
        return true;
    }
})

        .returns(rowTypeInfo);
returns.print("======&gt;");
??


&nbsp;

回复