Hi Ivan, Is your question how to parse the JSON string in PyFlink? If so, maybe you could take a look at this [1].
Regards, Dian [1] https://stackoverflow.com/questions/71820015/how-to-reference-nested-json-within-pyflink-sql-when-json-schema-varies On Fri, Jun 10, 2022 at 7:40 PM ivan.ros...@agilent.com < ivan.ros...@agilent.com> wrote: > Hello, > > > > I have a flink table source working using > > > > """ > > create table source ( > > ts TIMESTAMP(3), > > log_line STRING, > > WATERMARK FOR ts AS ts - INTERVAL '1' SECOND > > ) with ( > > 'connector'='lokitail', 'query'='blah', 'url'='blah' > > ) > > """) > > > > It uses a simple custom table source, which collects rows like this: > > > > ctx.collect(GenericRowData.of( > > > TimestampData.fromEpochMillis(Instant.now().toEpochMilli()), > > StringData.fromString("field0_counter_" + count++)) > > ) > > > > I would like, instead, to just send a single JSON string, like: > > > > ctx.collect(GenericRowData.of( > > StringData.fromString("{\"value\" : \"field0_counter_" + > count++ + "\", \"ts\":\"" + Instant.now().toEpochMilli() + "\"}")) > > ); > > > > And handle parsing in python flink. Can this be done simply at the point > of collecting the row data? > > > > Thank you, > > > Ivan >