????flink??????????????????HashMap<String,Metadata> ????????????????????????java???? ??flink???????? SingleOutputStreamOperator<Row> returns = tenv.toRetractStream(table, Row.class).map(new RichMapFunction<Tuple2<Boolean, Row>, Row>() { HashMap<String, Metadata> metadataHashMap1 =metadataHashMap; @Override public Row map(Tuple2<Boolean, Row> value) throws Exception { String[] split = finalColumn.split(","); for (int i = 0; i < split.length; i++) { if (value.f1.getField(i) == null) continue; Object fieldValue = value.f1.getField(i); Metadata metadata = metadataHashMap.get(fieldValue); value.f1.setField(i, metadata.getTag_value()); } return value.f1; } }).returns(rowTypeInfo); ???????????? Metadata metadata = metadataHashMap.get(fieldValue);??null??????????????
Flink??????1.12.5 ??????????????Flink??1.14.0??????????????????????????toRetractStream????toDataStream?????? SingleOutputStreamOperator<Row> returns = tenv.toDataStream(table, Row.class).map(new MapFunction<Row, Row>() { @Override public Row map(Row value) throws Exception { String[] split = finalColumn.split(","); for (int i = 0; i < split.length; i++) { if (value.getField(i) == null) continue; Object fieldValue = value.getField(i); Metadata metadata = metadataHashMap.get(fieldValue); value.setField(i, metadata.getTag_value()); } return value; } }) .filter(new FilterFunction<Row>() { @Override public boolean filter(Row value) throws Exception { if (value.getField("_id") == null){ return false; } return true; } }) .returns(rowTypeInfo); returns.print("======>"); ??