Hi All, Do I need to use DataStream API or Table API to construct sources? I am just trying to read from Kafka and print it to console. And yes I tried it with datastreams and it works fine but I want to do it using Table related APIs. I don't see any documentation or a sample on how to create Kafka table source or any other source using Table Source API's so after some digging I wrote the following code. My ultimate goal is to avoid Datastream API as much as possible and just use Table API & SQL but somehow I feel the Flink framework focuses on DataStream than the SQL interface. am I wrong? >From the user perspective wouldn't it make more sense to focus on SQL interfaces for both streaming and batch?
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.sources.TableSource; import org.apache.flink.types.Row; import java.io.IOException; import java.util.Properties; public class Test { public class MyDeserializationSchema extends AbstractDeserializationSchema<Row> { @Override public Row deserialize(byte[] message) throws IOException { return Row.of(new String(message)); } } public static void main(String... args) throws Exception { Test test = new Test(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment, settings); TableSource tableSource = test.getKafkaTableSource(); Table kafkaTable = tableEnvironment.fromTableSource(tableSource); tableEnvironment.createTemporaryView("kafka_source", kafkaTable); Table resultTable = tableEnvironment.sqlQuery("select * from kafka_source"); tableEnvironment.toAppendStream(resultTable, Row.class).print(); streamExecutionEnvironment.execute("Sample Job"); } public KafkaTableSource getKafkaTableSource() { TableSchema tableSchema = TableSchema.builder().field("f0", DataTypes.STRING()).build(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); return new KafkaTableSource(tableSchema, "test-topic1", properties, new MyDeserializationSchema()); } } I get the following error The program finished with the following exception: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields. org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.Iterator$class.foreach(Iterator.scala:891) scala.collection.AbstractIterator.foreach(Iterator.scala:1334) scala.collection.IterableLike$class.foreach(IterableLike.scala:72) scala.collection.AbstractIterable.foreach(Iterable.scala:54) scala.collection.TraversableLike$class.map(TraversableLike.scala:234) scala.collection.AbstractTraversable.map(Traversable.scala:104) org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) Test.main(Test.java:40) The error seems to be on the line tableEnvironment.toAppendStream(resultTable, Row.class).print(); and I am not sure why it is not able to serialize? Thanks!