Hi all,
I am new to flink sql and having trouble understanding how to debug my
code.
I wish to use some kind of a print function to see the output from my
queries in the pipeline's logs.
Below there's a pipeline similar to the one i am using, which integrates
with kafka and sends an avro message as output. I am able to see an output
on 'out.topic', but am unable to understand how to use the .print() method.

Any help would be appreciated
(Using Flink 1.19)

public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
>     String bootstrapServers = "my-bootstrap-servers:9092";
>     String schemaUrl = "my-schema-registry:8081";
>
>     String createSourceTable = "CREATE TABLE input (" +
>             "    NAME STRING," +
>             "    ADDRESS STRING," +
>             "    METRIC1 FLOAT," +
>             "    METRIC2 FLOAT" +
>             ") WITH (" +
>             "    'connector' = 'kafka'," +
>             "    'topic' = 'in.topic'," +
>             "    'properties.bootstrap.servers' = 'http://"; + 
> bootstrapServers + "'," +
>             "    'properties.group.id' = 'flink.sql.group'," +
>             "    'value.format' = 'avro-confluent'," +
>             "    'scan.startup.mode' = 'latest-offset'," +
>             "    'value.avro-confluent.url' = 'http://"; + schemaUrl + "')";
>
>     tableEnv.executeSql(createSourceTable);
>
>     String createSinkTable = "CREATE TABLE output (" +
>             "    NAME STRING," +
>             "    METRIC_RATIO FLOAT," +
>             ") WITH (" +
>             "    'connector' = 'kafka'," +
>             "    'topic' = 'out.topic'," +
>             "    'properties.bootstrap.servers' = '" + bootstrapServers + 
> "'," +
>             "    'properties.group.id' = 'flink.sql.group'," +
>             "    'value.format' = 'avro-confluent'," +
>             "    'value.avro-confluent.url' = 'http://"; + schemaUrl + "')";
>
>     tableEnv.executeSql(createSinkTable);
>
>     String sqlQuery = "INSERT INTO output " +
>             "SELECT " +
>             "    NAME, " +
>             "    CAST(METRIC1 / METRIC2 AS FLOAT) AS METRIC_RATIO" +
>             "FROM input;";
>
>
>     tableEnv.executeSql(sqlQuery);
>
>     tableEnv.executeSql("SELECT * FROM output").print();
>
>    }
>
>

Reply via email to