Hi, You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s not part of any public API.
You don’t have to convert DataStream into Table to read from Kafka in Table API. I guess you could, if you had used DataStream API’s FlinkKafkaConsumer as it’s documented here [1]. But you should be able to use Kafka Table connector directly, as it is described in the docs [2][3]. Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview> [3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector> > On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote: > > Also why do I need to convert to DataStream to print the rows of a table? Why > not have a print method in the Table itself? > > On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com > <mailto:kanth...@gmail.com>> wrote: > Hi All, > > Do I need to use DataStream API or Table API to construct sources? I am just > trying to read from Kafka and print it to console. And yes I tried it with > datastreams and it works fine but I want to do it using Table related APIs. I > don't see any documentation or a sample on how to create Kafka table source > or any other source using Table Source API's so after some digging I wrote > the following code. My ultimate goal is to avoid Datastream API as much as > possible and just use Table API & SQL but somehow I feel the Flink framework > focuses on DataStream than the SQL interface. am I wrong? From the user > perspective wouldn't it make more sense to focus on SQL interfaces for both > streaming and batch? > > > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.TableSchema; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.table.sources.TableSource; > import org.apache.flink.types.Row; > > import java.io.IOException; > import java.util.Properties; > > public class Test { > > public class MyDeserializationSchema extends > AbstractDeserializationSchema<Row> { > @Override > public Row deserialize(byte[] message) throws IOException { > return Row.of(new String(message)); > } > } > > public static void main(String... args) throws Exception { > Test test = new Test(); > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > > StreamExecutionEnvironment streamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(streamExecutionEnvironment, settings); > > TableSource tableSource = test.getKafkaTableSource(); > Table kafkaTable = tableEnvironment.fromTableSource(tableSource); > tableEnvironment.createTemporaryView("kafka_source", kafkaTable); > > Table resultTable = tableEnvironment.sqlQuery("select * from > kafka_source"); > tableEnvironment.toAppendStream(resultTable, Row.class).print(); > > streamExecutionEnvironment.execute("Sample Job"); > } > > public KafkaTableSource getKafkaTableSource() { > TableSchema tableSchema = TableSchema.builder().field("f0", > DataTypes.STRING()).build(); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id <http://group.id/>", "test"); > return new KafkaTableSource(tableSchema, "test-topic1", properties, > new MyDeserializationSchema()); > } > } > > I get the following error > > The program finished with the following exception: > > The implementation of the FlinkKafkaConsumerBase is not serializable. The > object probably contains or references non serializable fields. > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) > org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > scala.collection.Iterator$class.foreach(Iterator.scala:891) > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > scala.collection.AbstractTraversable.map(Traversable.scala:104) > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) > Test.main(Test.java:40) > > The error seems to be on the line > tableEnvironment.toAppendStream(resultTable, Row.class).print(); > and I am not sure why it is not able to serialize? > > Thanks!