Hi, > Thanks for the pointer. Looks like the documentation says to use > tableEnv.registerTableSink however in my IDE it shows the method is > deprecated in Flink 1.10.
It looks like not all of the documentation was updated after methods were deprecated. However if you look at the java docs of the `registerTableSink` method, you can find an answer [1]. > It sill doesnt work because it says for CSV the connector.type should be > filesystem not Kafka. Can you post the full stack trace? As I’m not familiar with the Table API, maybe you Timo or Dawid know what’s going on here? Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink- <https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-> > On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote: > > Here is my updated code after digging through the source code (not sure if it > is correct ). It sill doesnt work because it says for CSV the connector.type > should be filesystem not Kafka but documentation says it is supported. > > > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.table.descriptors.Csv; > import org.apache.flink.table.descriptors.Kafka; > import org.apache.flink.table.descriptors.Schema; > > public class Test { > > public static void main(String... args) throws Exception { > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > > StreamExecutionEnvironment streamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment(); > streamExecutionEnvironment.setStateBackend((StateBackend) new > RocksDBStateBackend("file:///tmp/rocksdb")); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(streamExecutionEnvironment, settings); > > tableEnvironment > .connect( > new Kafka() > .version("0.11") > .topic("test-topic1") > ) > .withFormat(new Csv()) > .withSchema(new Schema().field("f0", DataTypes.STRING())) > .inAppendMode() > .createTemporaryTable("kafka_source"); > > Table resultTable = tableEnvironment.sqlQuery("select * from > kafka_source"); > > tableEnvironment > .connect( > new Kafka() > .version("0.11") > .topic("test-topic2") > ) > .withFormat(new Csv()) > .withSchema(new Schema().field("f0", DataTypes.STRING())) > .inAppendMode() > .createTemporaryTable("kafka_target"); > > tableEnvironment.insertInto("kafka_target", resultTable); > > tableEnvironment.execute("Sample Job"); > } > } > > On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com > <mailto:kanth...@gmail.com>> wrote: > Hi Benchao, > > Agreed a ConsoleSink is very useful but that is not the only problem here. > Documentation says use tableEnv.registerTableSink all over the place > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink > > <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink> > however that function is deprecated. so how do I add any other Sink? > > Thanks! > > > > > > On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com > <mailto:libenc...@gmail.com>> wrote: > Hi kant, > > AFAIK, there is no "print to stdout" sink for Table API now, you can > implement one custom sink following this doc[1]. > > IMO, an out-of-box print table sink is very useful, and I've created an > issue[2] to track this. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink > > <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink> > [2] https://issues.apache.org/jira/browse/FLINK-16354 > <https://issues.apache.org/jira/browse/FLINK-16354> > kant kodali <kanth...@gmail.com <mailto:kanth...@gmail.com>> 于2020年3月1日周日 > 上午2:30写道: > Hi, > > Thanks for the pointer. Looks like the documentation says to use > tableEnv.registerTableSink however in my IDE it shows the method is > deprecated in Flink 1.10. so I am still not seeing a way to add a sink that > can print to stdout? what sink should I use to print to stdout and how do I > add it without converting into DataStream? > > Thanks! > > On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi, > > You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s not > part of any public API. > > You don’t have to convert DataStream into Table to read from Kafka in Table > API. I guess you could, if you had used DataStream API’s FlinkKafkaConsumer > as it’s documented here [1]. > > But you should be able to use Kafka Table connector directly, as it is > described in the docs [2][3]. > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html> > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview > > <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview> > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector > > <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector> > >> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com >> <mailto:kanth...@gmail.com>> wrote: >> >> Also why do I need to convert to DataStream to print the rows of a table? >> Why not have a print method in the Table itself? >> >> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com >> <mailto:kanth...@gmail.com>> wrote: >> Hi All, >> >> Do I need to use DataStream API or Table API to construct sources? I am just >> trying to read from Kafka and print it to console. And yes I tried it with >> datastreams and it works fine but I want to do it using Table related APIs. >> I don't see any documentation or a sample on how to create Kafka table >> source or any other source using Table Source API's so after some digging I >> wrote the following code. My ultimate goal is to avoid Datastream API as >> much as possible and just use Table API & SQL but somehow I feel the Flink >> framework focuses on DataStream than the SQL interface. am I wrong? From the >> user perspective wouldn't it make more sense to focus on SQL interfaces for >> both streaming and batch? >> >> >> import >> org.apache.flink.api.common.serialization.AbstractDeserializationSchema; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; >> import org.apache.flink.table.api.DataTypes; >> import org.apache.flink.table.api.EnvironmentSettings; >> import org.apache.flink.table.api.Table; >> import org.apache.flink.table.api.TableSchema; >> import org.apache.flink.table.api.java.StreamTableEnvironment; >> import org.apache.flink.table.sources.TableSource; >> import org.apache.flink.types.Row; >> >> import java.io.IOException; >> import java.util.Properties; >> >> public class Test { >> >> public class MyDeserializationSchema extends >> AbstractDeserializationSchema<Row> { >> @Override >> public Row deserialize(byte[] message) throws IOException { >> return Row.of(new String(message)); >> } >> } >> >> public static void main(String... args) throws Exception { >> Test test = new Test(); >> EnvironmentSettings settings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> >> StreamExecutionEnvironment streamExecutionEnvironment = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> StreamTableEnvironment tableEnvironment = >> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >> >> TableSource tableSource = test.getKafkaTableSource(); >> Table kafkaTable = tableEnvironment.fromTableSource(tableSource); >> tableEnvironment.createTemporaryView("kafka_source", kafkaTable); >> >> Table resultTable = tableEnvironment.sqlQuery("select * from >> kafka_source"); >> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >> >> streamExecutionEnvironment.execute("Sample Job"); >> } >> >> public KafkaTableSource getKafkaTableSource() { >> TableSchema tableSchema = TableSchema.builder().field("f0", >> DataTypes.STRING()).build(); >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("group.id <http://group.id/>", "test"); >> return new KafkaTableSource(tableSchema, "test-topic1", properties, >> new MyDeserializationSchema()); >> } >> } >> >> I get the following error >> >> The program finished with the following exception: >> >> The implementation of the FlinkKafkaConsumerBase is not serializable. The >> object probably contains or references non serializable fields. >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> scala.collection.Iterator$class.foreach(Iterator.scala:891) >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >> Test.main(Test.java:40) >> >> The error seems to be on the line >> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >> and I am not sure why it is not able to serialize? >> >> Thanks! > > > > -- > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com <mailto:libenc...@gmail.com>; libenc...@pku.edu.cn > <mailto:libenc...@pku.edu.cn>