hi kant, > Also why do I need to convert to DataStream to print the rows of a table? Why not have a print method in the Table itself? Flink 1.10 introduces a utility class named TableUtils to convert a Table to List<Row>, this utility class is mainly used for demonstration or testing and is only applicable for *small batch jobs* and small finite *append only stream jobs*. code like: Table table = tEnv.sqlQuery("select ..."); List<Row> result = TableUtils.collectToList(table); result.....
currently, we are planner to implement Table#collect[1], after that Table#head and Table#print may be also introduced soon. > The program finished with the following exception: please make sure that the kafka version in Test class and the kafka version in pom dependency are same. I tested your code successfully. Bests, Godfrey [1] https://issues.apache.org/jira/browse/FLINK-14807 Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道: > Hi kant, > > CSV format is an independent module, you need to add it as your > dependency. > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-csv</artifactId> > <version>${flink.version}</version> > </dependency> > > > kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道: > >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: findAndCreateTableSource failed. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at >> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: org.apache.flink.table.api.TableException: >> findAndCreateTableSource failed. >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) >> at Test.main(Test.java:34) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >> ... 8 more >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >> Could not find a suitable table factory for >> 'org.apache.flink.table.factories.TableSourceFactory' in >> the classpath. >> >> Reason: Required context properties mismatch. >> >> The matching candidates: >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> Mismatched properties: >> 'connector.type' expects 'filesystem', but is 'kafka' >> >> The following properties are requested: >> connector.property-version=1 >> connector.topic=test-topic1 >> connector.type=kafka >> connector.version=0.11 >> format.property-version=1 >> format.type=csv >> schema.0.data-type=VARCHAR(2147483647) >> schema.0.name=f0 >> update-mode=append >> >> The following factories have been considered: >> org.apache.flink.table.sources.CsvBatchTableSourceFactory >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> at >> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >> at >> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >> at >> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >> at >> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >> ... 34 more >> >> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com> >> wrote: >> >>> Hi, >>> >>> > Thanks for the pointer. Looks like the documentation says to use >>> tableEnv.registerTableSink however in my IDE it shows the method is >>> deprecated in Flink 1.10. >>> >>> It looks like not all of the documentation was updated after methods >>> were deprecated. However if you look at the java docs of the >>> `registerTableSink` method, you can find an answer [1]. >>> >>> > It sill doesnt work because it says for CSV the connector.type >>> should be filesystem not Kafka. >>> >>> Can you post the full stack trace? As I’m not familiar with the Table >>> API, maybe you Timo or Dawid know what’s going on here? >>> >>> Piotrek >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink- >>> >>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote: >>> >>> Here is my updated code after digging through the source code (not sure >>> if it is correct ). It sill doesnt work because it says for CSV the >>> connector.type should be filesystem not Kafka but documentation says it is >>> supported. >>> >>> >>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; >>> import org.apache.flink.runtime.state.StateBackend; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import org.apache.flink.table.api.DataTypes; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.Table; >>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>> import org.apache.flink.table.descriptors.Csv; >>> import org.apache.flink.table.descriptors.Kafka; >>> import org.apache.flink.table.descriptors.Schema; >>> >>> public class Test { >>> >>> public static void main(String... args) throws Exception { >>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode() >>> .build(); >>> >>> StreamExecutionEnvironment streamExecutionEnvironment = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> streamExecutionEnvironment.setStateBackend((StateBackend) new >>> RocksDBStateBackend("file:///tmp/rocksdb")); >>> StreamTableEnvironment tableEnvironment = >>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>> >>> tableEnvironment >>> .connect( >>> new Kafka() >>> .version("0.11") >>> .topic("test-topic1") >>> ) >>> .withFormat(new Csv()) >>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>> .inAppendMode() >>> .createTemporaryTable("kafka_source"); >>> >>> Table resultTable = tableEnvironment.sqlQuery("select * from >>> kafka_source"); >>> >>> tableEnvironment >>> .connect( >>> new Kafka() >>> .version("0.11") >>> .topic("test-topic2") >>> ) >>> .withFormat(new Csv()) >>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>> .inAppendMode() >>> .createTemporaryTable("kafka_target"); >>> >>> tableEnvironment.insertInto("kafka_target", resultTable); >>> >>> tableEnvironment.execute("Sample Job"); >>> } >>> } >>> >>> >>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> wrote: >>> >>>> Hi Benchao, >>>> >>>> Agreed a ConsoleSink is very useful but that is not the only problem >>>> here. Documentation says use tableEnv.registerTableSink all over the place >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink >>>> however that function is deprecated. so how do I add any other Sink? >>>> >>>> Thanks! >>>> >>>> >>>> >>>> >>>> >>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> wrote: >>>> >>>>> Hi kant, >>>>> >>>>> AFAIK, there is no "print to stdout" sink for Table API now, you can >>>>> implement one custom sink following this doc[1]. >>>>> >>>>> IMO, an out-of-box print table sink is very useful, and I've created >>>>> an issue[2] to track this. >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink >>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354 >>>>> >>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: >>>>> >>>>>> Hi, >>>>>> >>>>>> Thanks for the pointer. Looks like the documentation says to use >>>>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink >>>>>> that >>>>>> can print to stdout? what sink should I use to print to stdout and how >>>>>> do I >>>>>> add it without converting into DataStream? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. >>>>>>> It’s not part of any public API. >>>>>>> >>>>>>> You don’t have to convert DataStream into Table to read from Kafka >>>>>>> in Table API. I guess you could, if you had used DataStream API’s >>>>>>> FlinkKafkaConsumer as it’s documented here [1]. >>>>>>> >>>>>>> But you should be able to use Kafka Table connector directly, as it >>>>>>> is described in the docs [2][3]. >>>>>>> >>>>>>> Piotrek >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >>>>>>> [2] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >>>>>>> [3] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>>>> >>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote: >>>>>>> >>>>>>> Also why do I need to convert to DataStream to print the rows of a >>>>>>> table? Why not have a print method in the Table itself? >>>>>>> >>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> Do I need to use DataStream API or Table API to construct sources? >>>>>>>> I am just trying to read from Kafka and print it to console. And yes I >>>>>>>> tried it with datastreams and it works fine but I want to do it using >>>>>>>> Table >>>>>>>> related APIs. I don't see any documentation or a sample on how to >>>>>>>> create >>>>>>>> Kafka table source or any other source using Table Source API's so >>>>>>>> after >>>>>>>> some digging I wrote the following code. My ultimate goal is to avoid >>>>>>>> Datastream API as much as possible and just use Table API & SQL but >>>>>>>> somehow >>>>>>>> I feel the Flink framework focuses on DataStream than the SQL >>>>>>>> interface. am >>>>>>>> I wrong? From the user perspective wouldn't it make more sense to >>>>>>>> focus on >>>>>>>> SQL interfaces for both streaming and batch? >>>>>>>> >>>>>>>> >>>>>>>> import >>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema; >>>>>>>> import >>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; >>>>>>>> import org.apache.flink.table.api.DataTypes; >>>>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>>>> import org.apache.flink.table.api.Table; >>>>>>>> import org.apache.flink.table.api.TableSchema; >>>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>>>>>> import org.apache.flink.table.sources.TableSource; >>>>>>>> import org.apache.flink.types.Row; >>>>>>>> >>>>>>>> import java.io.IOException; >>>>>>>> import java.util.Properties; >>>>>>>> >>>>>>>> public class Test { >>>>>>>> >>>>>>>> public class MyDeserializationSchema extends >>>>>>>> AbstractDeserializationSchema<Row> { >>>>>>>> @Override >>>>>>>> public Row deserialize(byte[] message) throws IOException { >>>>>>>> return Row.of(new String(message)); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> public static void main(String... args) throws Exception { >>>>>>>> Test test = new Test(); >>>>>>>> EnvironmentSettings settings = >>>>>>>> EnvironmentSettings.newInstance() >>>>>>>> .useBlinkPlanner() >>>>>>>> .inStreamingMode() >>>>>>>> .build(); >>>>>>>> >>>>>>>> StreamExecutionEnvironment streamExecutionEnvironment = >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>>>>> >>>>>>>> TableSource tableSource = test.getKafkaTableSource(); >>>>>>>> Table kafkaTable = >>>>>>>> tableEnvironment.fromTableSource(tableSource); >>>>>>>> tableEnvironment.createTemporaryView("kafka_source", >>>>>>>> kafkaTable); >>>>>>>> >>>>>>>> Table resultTable = tableEnvironment.sqlQuery("select * from >>>>>>>> kafka_source"); >>>>>>>> tableEnvironment.toAppendStream(resultTable, >>>>>>>> Row.class).print(); >>>>>>>> >>>>>>>> streamExecutionEnvironment.execute("Sample Job"); >>>>>>>> } >>>>>>>> >>>>>>>> public KafkaTableSource getKafkaTableSource() { >>>>>>>> TableSchema tableSchema = TableSchema.builder().field("f0", >>>>>>>> DataTypes.STRING()).build(); >>>>>>>> Properties properties = new Properties(); >>>>>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>>>>> properties.setProperty("group.id", "test"); >>>>>>>> return new KafkaTableSource(tableSchema, "test-topic1", >>>>>>>> properties, new MyDeserializationSchema()); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> I get the following error >>>>>>>> >>>>>>>> The program finished with the following exception: >>>>>>>> >>>>>>>> The implementation of the FlinkKafkaConsumerBase is not >>>>>>>> serializable. The object probably contains or references non >>>>>>>> serializable >>>>>>>> fields. >>>>>>>> >>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>>> >>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>>> >>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>>>>>>> >>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>>>>>>> >>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>>>>>>> >>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>>>>>>> >>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>>>>> >>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>> >>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>> >>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>>>>> >>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>>>>> >>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>>>>> >>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>>>>>> >>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>>>>>> Test.main(Test.java:40) >>>>>>>> >>>>>>>> The error seems to be on the line >>>>>>>> >>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>>> >>>>>>>> and I am not sure why it is not able to serialize? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> >>>>> Benchao Li >>>>> School of Electronics Engineering and Computer Science, Peking University >>>>> Tel:+86-15650713730 >>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>>> >>>>> >>> > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn > >