Hi kant, AFAIK, there is no "print to stdout" sink for Table API now, you can implement one custom sink following this doc[1].
IMO, an out-of-box print table sink is very useful, and I've created an issue[2] to track this. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink [2] https://issues.apache.org/jira/browse/FLINK-16354 kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: > Hi, > > Thanks for the pointer. Looks like the documentation says to use > tableEnv.registerTableSink however in my IDE it shows the method is > deprecated in Flink 1.10. so I am still not seeing a way to add a sink that > can print to stdout? what sink should I use to print to stdout and how do I > add it without converting into DataStream? > > Thanks! > > On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com> > wrote: > >> Hi, >> >> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s >> not part of any public API. >> >> You don’t have to convert DataStream into Table to read from Kafka in >> Table API. I guess you could, if you had used DataStream API’s >> FlinkKafkaConsumer as it’s documented here [1]. >> >> But you should be able to use Kafka Table connector directly, as it is >> described in the docs [2][3]. >> >> Piotrek >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >> >> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote: >> >> Also why do I need to convert to DataStream to print the rows of a table? >> Why not have a print method in the Table itself? >> >> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> wrote: >> >>> Hi All, >>> >>> Do I need to use DataStream API or Table API to construct sources? I am >>> just trying to read from Kafka and print it to console. And yes I tried it >>> with datastreams and it works fine but I want to do it using Table related >>> APIs. I don't see any documentation or a sample on how to create Kafka >>> table source or any other source using Table Source API's so after some >>> digging I wrote the following code. My ultimate goal is to avoid Datastream >>> API as much as possible and just use Table API & SQL but somehow I feel the >>> Flink framework focuses on DataStream than the SQL interface. am I wrong? >>> From the user perspective wouldn't it make more sense to focus on SQL >>> interfaces for both streaming and batch? >>> >>> >>> import >>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; >>> import org.apache.flink.table.api.DataTypes; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.Table; >>> import org.apache.flink.table.api.TableSchema; >>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>> import org.apache.flink.table.sources.TableSource; >>> import org.apache.flink.types.Row; >>> >>> import java.io.IOException; >>> import java.util.Properties; >>> >>> public class Test { >>> >>> public class MyDeserializationSchema extends >>> AbstractDeserializationSchema<Row> { >>> @Override >>> public Row deserialize(byte[] message) throws IOException { >>> return Row.of(new String(message)); >>> } >>> } >>> >>> public static void main(String... args) throws Exception { >>> Test test = new Test(); >>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode() >>> .build(); >>> >>> StreamExecutionEnvironment streamExecutionEnvironment = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> StreamTableEnvironment tableEnvironment = >>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>> >>> TableSource tableSource = test.getKafkaTableSource(); >>> Table kafkaTable = tableEnvironment.fromTableSource(tableSource); >>> tableEnvironment.createTemporaryView("kafka_source", kafkaTable); >>> >>> Table resultTable = tableEnvironment.sqlQuery("select * from >>> kafka_source"); >>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>> >>> streamExecutionEnvironment.execute("Sample Job"); >>> } >>> >>> public KafkaTableSource getKafkaTableSource() { >>> TableSchema tableSchema = TableSchema.builder().field("f0", >>> DataTypes.STRING()).build(); >>> Properties properties = new Properties(); >>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>> properties.setProperty("group.id", "test"); >>> return new KafkaTableSource(tableSchema, "test-topic1", properties, >>> new MyDeserializationSchema()); >>> } >>> } >>> >>> >>> I get the following error >>> >>> The program finished with the following exception: >>> >>> The implementation of the FlinkKafkaConsumerBase is not serializable. >>> The object probably contains or references non serializable fields. >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>> >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>> >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>> >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>> >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>> >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>> >>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>> >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>> >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>> >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>> >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>> >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>> >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>> >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>> >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>> >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>> >>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>> >>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>> >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>> >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>> >>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>> >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>> >>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>> >>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>> >>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>> Test.main(Test.java:40) >>> >>> The error seems to be on the line >>> >>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>> >>> and I am not sure why it is not able to serialize? >>> >>> Thanks! >>> >> >> -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn