Here is my updated code after digging through the source code (not sure if it is correct ). It sill doesnt work because it says for CSV the connector.type should be filesystem not Kafka but documentation says it is supported.
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; public class Test { public static void main(String... args) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecutionEnvironment.setStateBackend((StateBackend) new RocksDBStateBackend("file:///tmp/rocksdb")); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment, settings); tableEnvironment .connect( new Kafka() .version("0.11") .topic("test-topic1") ) .withFormat(new Csv()) .withSchema(new Schema().field("f0", DataTypes.STRING())) .inAppendMode() .createTemporaryTable("kafka_source"); Table resultTable = tableEnvironment.sqlQuery("select * from kafka_source"); tableEnvironment .connect( new Kafka() .version("0.11") .topic("test-topic2") ) .withFormat(new Csv()) .withSchema(new Schema().field("f0", DataTypes.STRING())) .inAppendMode() .createTemporaryTable("kafka_target"); tableEnvironment.insertInto("kafka_target", resultTable); tableEnvironment.execute("Sample Job"); } } On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> wrote: > Hi Benchao, > > Agreed a ConsoleSink is very useful but that is not the only problem here. > Documentation says use tableEnv.registerTableSink all over the place > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink > however that function is deprecated. so how do I add any other Sink? > > Thanks! > > > > > > On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> wrote: > >> Hi kant, >> >> AFAIK, there is no "print to stdout" sink for Table API now, you can >> implement one custom sink following this doc[1]. >> >> IMO, an out-of-box print table sink is very useful, and I've created an >> issue[2] to track this. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink >> [2] https://issues.apache.org/jira/browse/FLINK-16354 >> >> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: >> >>> Hi, >>> >>> Thanks for the pointer. Looks like the documentation says to use >>> tableEnv.registerTableSink however in my IDE it shows the method is >>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that >>> can print to stdout? what sink should I use to print to stdout and how do I >>> add it without converting into DataStream? >>> >>> Thanks! >>> >>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com> >>> wrote: >>> >>>> Hi, >>>> >>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. >>>> It’s not part of any public API. >>>> >>>> You don’t have to convert DataStream into Table to read from Kafka in >>>> Table API. I guess you could, if you had used DataStream API’s >>>> FlinkKafkaConsumer as it’s documented here [1]. >>>> >>>> But you should be able to use Kafka Table connector directly, as it is >>>> described in the docs [2][3]. >>>> >>>> Piotrek >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >>>> [3] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>> >>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote: >>>> >>>> Also why do I need to convert to DataStream to print the rows of a >>>> table? Why not have a print method in the Table itself? >>>> >>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> Do I need to use DataStream API or Table API to construct sources? I >>>>> am just trying to read from Kafka and print it to console. And yes I tried >>>>> it with datastreams and it works fine but I want to do it using Table >>>>> related APIs. I don't see any documentation or a sample on how to create >>>>> Kafka table source or any other source using Table Source API's so after >>>>> some digging I wrote the following code. My ultimate goal is to avoid >>>>> Datastream API as much as possible and just use Table API & SQL but >>>>> somehow >>>>> I feel the Flink framework focuses on DataStream than the SQL interface. >>>>> am >>>>> I wrong? From the user perspective wouldn't it make more sense to focus on >>>>> SQL interfaces for both streaming and batch? >>>>> >>>>> >>>>> import >>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema; >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; >>>>> import org.apache.flink.table.api.DataTypes; >>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>> import org.apache.flink.table.api.Table; >>>>> import org.apache.flink.table.api.TableSchema; >>>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>>> import org.apache.flink.table.sources.TableSource; >>>>> import org.apache.flink.types.Row; >>>>> >>>>> import java.io.IOException; >>>>> import java.util.Properties; >>>>> >>>>> public class Test { >>>>> >>>>> public class MyDeserializationSchema extends >>>>> AbstractDeserializationSchema<Row> { >>>>> @Override >>>>> public Row deserialize(byte[] message) throws IOException { >>>>> return Row.of(new String(message)); >>>>> } >>>>> } >>>>> >>>>> public static void main(String... args) throws Exception { >>>>> Test test = new Test(); >>>>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>>>> .useBlinkPlanner() >>>>> .inStreamingMode() >>>>> .build(); >>>>> >>>>> StreamExecutionEnvironment streamExecutionEnvironment = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> StreamTableEnvironment tableEnvironment = >>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>> >>>>> TableSource tableSource = test.getKafkaTableSource(); >>>>> Table kafkaTable = tableEnvironment.fromTableSource(tableSource); >>>>> tableEnvironment.createTemporaryView("kafka_source", kafkaTable); >>>>> >>>>> Table resultTable = tableEnvironment.sqlQuery("select * from >>>>> kafka_source"); >>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>> >>>>> streamExecutionEnvironment.execute("Sample Job"); >>>>> } >>>>> >>>>> public KafkaTableSource getKafkaTableSource() { >>>>> TableSchema tableSchema = TableSchema.builder().field("f0", >>>>> DataTypes.STRING()).build(); >>>>> Properties properties = new Properties(); >>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>> properties.setProperty("group.id", "test"); >>>>> return new KafkaTableSource(tableSchema, "test-topic1", >>>>> properties, new MyDeserializationSchema()); >>>>> } >>>>> } >>>>> >>>>> >>>>> I get the following error >>>>> >>>>> The program finished with the following exception: >>>>> >>>>> The implementation of the FlinkKafkaConsumerBase is not serializable. >>>>> The object probably contains or references non serializable fields. >>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>> >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>>>> >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>>>> >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>>>> >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>>>> >>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>>>> >>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>>>> >>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>>>> >>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>>> >>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>> >>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>>> >>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>>> >>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>>> >>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>> >>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>> >>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>> >>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>> >>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>> >>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>> >>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>> >>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>> >>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>> >>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>> >>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>>> >>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>>> Test.main(Test.java:40) >>>>> >>>>> The error seems to be on the line >>>>> >>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>> >>>>> and I am not sure why it is not able to serialize? >>>>> >>>>> Thanks! >>>>> >>>> >>>> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn >> >>