目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def drop_fields(message, *fields): import json message = json.loads(message) for field in fields: message.pop(field) return json.dumps(message) st_env \ .form_path("source") \ .select("drop_fields(message,'x')") \ .insert_into("sink") message 格式: {“a”:"1","x","2"} 报错参数类型不匹配: Actual:(java.lang.String, java.lang.String) Expected:(org.apache.flink.table.dataformat.BinaryString) 新手入门,请多指教,感谢。