最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not
supported:
ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map
udf应该怎么操作呢?求前辈指导
udfd代码如下:
public class Json2List extends ScalarFunction {
private static final Logger LOG = LoggerFactory.getLogger(Json2List.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;
public Json2List(){}
public List<String> eval(String param) {
List<String> result = new ArrayList<>();
try {
List<Map<Object, Object>> list =
OBJECT_MAPPER.readValue(param, List.class);
for(Map<Object, Object> map : list){
result.add(OBJECT_MAPPER.writeValueAsString(map));
}
return result;
} catch (JsonProcessingException e){
LOG.error("failed to convert json to array, param is: {}", param, e);
}
return result;
}
@Override
public TypeInformation<List<String>> getResultType(Class<?>[] signature) {
return Types.LIST(Types.STRING);
}
}