The input types should be as following: input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
Regards, Dian > 在 2020年6月1日,上午10:49,刘亚坤 <wslyk...@163.com> 写道: > > 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题: > > @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) > def drop_fields(message, *fields): > import json > message = json.loads(message) > for field in fields: > message.pop(field) > return json.dumps(message) > > > st_env \ > .form_path("source") \ > .select("drop_fields(message,'x')") \ > .insert_into("sink") > > message 格式: > {“a”:"1","x","2"} > > 报错参数类型不匹配: > Actual:(java.lang.String, java.lang.String) > Expected:(org.apache.flink.table.dataformat.BinaryString) > > 新手入门,请多指教,感谢。