Hello,

I have a flink table source working using

"""
    create table source (
        ts TIMESTAMP(3),
        log_line STRING,
        WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
    ) with (
        'connector'='lokitail', 'query'='blah', 'url'='blah'
    )
""")

It uses a simple custom table source, which collects rows like this:

            ctx.collect(GenericRowData.of(
                TimestampData.fromEpochMillis(Instant.now().toEpochMilli()),
                StringData.fromString("field0_counter_" + count++))
            )

I would like, instead, to just send a single JSON string, like:

            ctx.collect(GenericRowData.of(
                StringData.fromString("{\"value\" : \"field0_counter_" + 
count++ + "\", \"ts\":\"" + Instant.now().toEpochMilli() + "\"}"))
            );

And handle parsing in python flink.  Can this be done simply at the point of 
collecting the row data?

Thank you,

Ivan

Reply via email to