Hi, Thanks for the pointer. Looks like the documentation says to use tableEnv.registerTableSink however in my IDE it shows the method is deprecated in Flink 1.10. so I am still not seeing a way to add a sink that can print to stdout? what sink should I use to print to stdout and how do I add it without converting into DataStream?
Thanks! On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com> wrote: > Hi, > > You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s > not part of any public API. > > You don’t have to convert DataStream into Table to read from Kafka in > Table API. I guess you could, if you had used DataStream API’s > FlinkKafkaConsumer as it’s documented here [1]. > > But you should be able to use Kafka Table connector directly, as it is > described in the docs [2][3]. > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector > > On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote: > > Also why do I need to convert to DataStream to print the rows of a table? > Why not have a print method in the Table itself? > > On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> wrote: > >> Hi All, >> >> Do I need to use DataStream API or Table API to construct sources? I am >> just trying to read from Kafka and print it to console. And yes I tried it >> with datastreams and it works fine but I want to do it using Table related >> APIs. I don't see any documentation or a sample on how to create Kafka >> table source or any other source using Table Source API's so after some >> digging I wrote the following code. My ultimate goal is to avoid Datastream >> API as much as possible and just use Table API & SQL but somehow I feel the >> Flink framework focuses on DataStream than the SQL interface. am I wrong? >> From the user perspective wouldn't it make more sense to focus on SQL >> interfaces for both streaming and batch? >> >> >> import >> org.apache.flink.api.common.serialization.AbstractDeserializationSchema; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; >> import org.apache.flink.table.api.DataTypes; >> import org.apache.flink.table.api.EnvironmentSettings; >> import org.apache.flink.table.api.Table; >> import org.apache.flink.table.api.TableSchema; >> import org.apache.flink.table.api.java.StreamTableEnvironment; >> import org.apache.flink.table.sources.TableSource; >> import org.apache.flink.types.Row; >> >> import java.io.IOException; >> import java.util.Properties; >> >> public class Test { >> >> public class MyDeserializationSchema extends >> AbstractDeserializationSchema<Row> { >> @Override >> public Row deserialize(byte[] message) throws IOException { >> return Row.of(new String(message)); >> } >> } >> >> public static void main(String... args) throws Exception { >> Test test = new Test(); >> EnvironmentSettings settings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> >> StreamExecutionEnvironment streamExecutionEnvironment = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> StreamTableEnvironment tableEnvironment = >> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >> >> TableSource tableSource = test.getKafkaTableSource(); >> Table kafkaTable = tableEnvironment.fromTableSource(tableSource); >> tableEnvironment.createTemporaryView("kafka_source", kafkaTable); >> >> Table resultTable = tableEnvironment.sqlQuery("select * from >> kafka_source"); >> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >> >> streamExecutionEnvironment.execute("Sample Job"); >> } >> >> public KafkaTableSource getKafkaTableSource() { >> TableSchema tableSchema = TableSchema.builder().field("f0", >> DataTypes.STRING()).build(); >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("group.id", "test"); >> return new KafkaTableSource(tableSchema, "test-topic1", properties, >> new MyDeserializationSchema()); >> } >> } >> >> >> I get the following error >> >> The program finished with the following exception: >> >> The implementation of the FlinkKafkaConsumerBase is not serializable. The >> object probably contains or references non serializable fields. >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >> >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >> >> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >> >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >> >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> scala.collection.Iterator$class.foreach(Iterator.scala:891) >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >> >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >> >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >> >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >> Test.main(Test.java:40) >> >> The error seems to be on the line >> >> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >> >> and I am not sure why it is not able to serialize? >> >> Thanks! >> > >