Hello, I have a flink table source working using
""" create table source ( ts TIMESTAMP(3), log_line STRING, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) with ( 'connector'='lokitail', 'query'='blah', 'url'='blah' ) """) It uses a simple custom table source, which collects rows like this: ctx.collect(GenericRowData.of( TimestampData.fromEpochMillis(Instant.now().toEpochMilli()), StringData.fromString("field0_counter_" + count++)) ) I would like, instead, to just send a single JSON string, like: ctx.collect(GenericRowData.of( StringData.fromString("{\"value\" : \"field0_counter_" + count++ + "\", \"ts\":\"" + Instant.now().toEpochMilli() + "\"}")) ); And handle parsing in python flink. Can this be done simply at the point of collecting the row data? Thank you, Ivan