Hi all,
I am new to flink sql and having trouble understanding how to debug my
code.
I wish to use some kind of a print function to see the output from my
queries in the pipeline's logs.
Below there's a pipeline similar to the one i am using, which integrates
with kafka and sends an avro message as output. I am able to see an output
on 'out.topic', but am unable to understand how to use the .print() method.
Any help would be appreciated
(Using Flink 1.19)
public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> String bootstrapServers = "my-bootstrap-servers:9092";
> String schemaUrl = "my-schema-registry:8081";
>
> String createSourceTable = "CREATE TABLE input (" +
> " NAME STRING," +
> " ADDRESS STRING," +
> " METRIC1 FLOAT," +
> " METRIC2 FLOAT" +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'in.topic'," +
> " 'properties.bootstrap.servers' = 'http://" +
> bootstrapServers + "'," +
> " 'properties.group.id' = 'flink.sql.group'," +
> " 'value.format' = 'avro-confluent'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'value.avro-confluent.url' = 'http://" + schemaUrl + "')";
>
> tableEnv.executeSql(createSourceTable);
>
> String createSinkTable = "CREATE TABLE output (" +
> " NAME STRING," +
> " METRIC_RATIO FLOAT," +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'out.topic'," +
> " 'properties.bootstrap.servers' = '" + bootstrapServers +
> "'," +
> " 'properties.group.id' = 'flink.sql.group'," +
> " 'value.format' = 'avro-confluent'," +
> " 'value.avro-confluent.url' = 'http://" + schemaUrl + "')";
>
> tableEnv.executeSql(createSinkTable);
>
> String sqlQuery = "INSERT INTO output " +
> "SELECT " +
> " NAME, " +
> " CAST(METRIC1 / METRIC2 AS FLOAT) AS METRIC_RATIO" +
> "FROM input;";
>
>
> tableEnv.executeSql(sqlQuery);
>
> tableEnv.executeSql("SELECT * FROM output").print();
>
> }
>
>