非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
在 2020-06-01 20:50:53,"Xingbo Huang" <hxbks...@gmail.com> 写道: Hi, 其实这个是CSV connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。 st_env.connect( Kafka() .version("0.11") .topic("logSink") .start_from_earliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) \ .with_format( # declare a format for this system Csv() .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())])) .quote_character("\0") ) \ .with_schema( # declare the schema of the table Schema() .field("log", DataTypes.STRING())) \ .in_append_mode() \ .register_table_sink("sink") Best, Xingbo