请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,
数据输入: {"topic": "logSource", "message": "x=1,y=1,z=1"} 发送到kafka里面的数据结果如下: "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}" 又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。 @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) defkv(log, pair_sep=',', kv_sep='='): import json log = json.loads(log) ret = {} items = re.split(pair_sep, log.get("message")) for item in items: ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1] log.update(ret) log = json.dumps(log) return log defregister_source(st_env): st_env \ .connect( # declare the external system to connect to Kafka() .version("0.10") .topic("logSource") .start_from_latest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) \ .with_format( # declare a format for this system Csv() .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())])) .field_delimiter("\n")) \ .with_schema( # declare the schema of the table Schema() .field("log", DataTypes.STRING())) \ .in_append_mode() \ .register_table_source("source") defregister_sink(st_env): st_env.connect( Kafka() .version("0.10") .topic("logSink") .start_from_earliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) \ .with_format( # declare a format for this system Csv() .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))) \ .with_schema( # declare the schema of the table Schema() .field("log", DataTypes.STRING())) \ .in_append_mode() \ .register_table_sink("sink") if __name__ == '__main__': s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) s_env.set_parallelism(1) st_env = StreamTableEnvironment \ .create(s_env, environment_settings=EnvironmentSettings .new_instance() .in_streaming_mode() .use_blink_planner().build()) st_env.register_function('e_kv', e_kv) register_source(st_env) register_sink(st_env) st_env \ .from_path("source") \ .select("kv(log,',', '=') as log") \ .insert_into("sink") \ st_env.execute("test")