Hi, 其实这个是CSV connector的一个可选的 quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。 st_env.connect( Kafka() .version("0.11") .topic("logSink") .start_from_earliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) \ .with_format( # declare a format for this system Csv() .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())])) .quote_character("\0") ) \ .with_schema( # declare the schema of the table Schema() .field("log", DataTypes.STRING())) \ .in_append_mode() \ .register_table_sink("sink")
Best, Xingbo jack <wslyk...@163.com> 于2020年6月1日周一 下午5:31写道: > *请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,* > > *数据输入:* > {"topic": "logSource", "message": "x=1,y=1,z=1"} > > 发送到kafka里面的数据结果如下: > "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}" > > *又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。* > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), > DataTypes.STRING()], result_type=DataTypes.STRING()) > def kv(log, pair_sep=',', kv_sep='='): > import json > log = json.loads(log) > ret = {} > items = re.split(pair_sep, log.get("message")) > for item in items: > ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1] > log.update(ret) > log = json.dumps(log) > return log > > > def register_source(st_env): > st_env \ > .connect( # declare the external system to connect to > Kafka() > .version("0.10") > .topic("logSource") > .start_from_latest() > .property("zookeeper.connect", "localhost:2181") > .property("bootstrap.servers", "localhost:9092")) \ > .with_format( # declare a format for this system > Csv() > .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())])) > .field_delimiter("\n")) \ > .with_schema( # declare the schema of the table > Schema() > .field("log", DataTypes.STRING())) \ > .in_append_mode() \ > .register_table_source("source") > > def register_sink(st_env): > st_env.connect( > Kafka() > .version("0.10") > .topic("logSink") > .start_from_earliest() > .property("zookeeper.connect", "localhost:2181") > .property("bootstrap.servers", "localhost:9092")) \ > .with_format( # declare a format for this system > Csv() > .schema(DataTypes.ROW([DataTypes.FIELD("log", > DataTypes.STRING())]))) \ > .with_schema( # declare the schema of the table > Schema() > .field("log", DataTypes.STRING())) \ > .in_append_mode() \ > .register_table_sink("sink") > > if __name__ == '__main__': > > s_env = StreamExecutionEnvironment.get_execution_environment() > s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > s_env.set_parallelism(1) > st_env = StreamTableEnvironment \ > .create(s_env, environment_settings=EnvironmentSettings > .new_instance() > .in_streaming_mode() > .use_blink_planner().build()) > st_env.register_function('e_kv', e_kv) > register_source(st_env) > register_sink(st_env) > st_env \ > .from_path("source") \ > .select("kv(log,',', '=') as log") \ > .insert_into("sink") \ > st_env.execute("test") > > >