Hi kant, CSV format is an independent module, you need to add it as your dependency.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道: > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: findAndCreateTableSource failed. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: org.apache.flink.table.api.TableException: > findAndCreateTableSource failed. > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) > at Test.main(Test.java:34) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > ... 8 more > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > > Reason: Required context properties mismatch. > > The matching candidates: > org.apache.flink.table.sources.CsvAppendTableSourceFactory > Mismatched properties: > 'connector.type' expects 'filesystem', but is 'kafka' > > The following properties are requested: > connector.property-version=1 > connector.topic=test-topic1 > connector.type=kafka > connector.version=0.11 > format.property-version=1 > format.type=csv > schema.0.data-type=VARCHAR(2147483647) > schema.0.name=f0 > update-mode=append > > The following factories have been considered: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > at > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > at > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) > ... 34 more > > On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com> > wrote: > >> Hi, >> >> > Thanks for the pointer. Looks like the documentation says to use >> tableEnv.registerTableSink however in my IDE it shows the method is >> deprecated in Flink 1.10. >> >> It looks like not all of the documentation was updated after methods were >> deprecated. However if you look at the java docs of the `registerTableSink` >> method, you can find an answer [1]. >> >> > It sill doesnt work because it says for CSV the connector.type should >> be filesystem not Kafka. >> >> Can you post the full stack trace? As I’m not familiar with the Table >> API, maybe you Timo or Dawid know what’s going on here? >> >> Piotrek >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink- >> >> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote: >> >> Here is my updated code after digging through the source code (not sure >> if it is correct ). It sill doesnt work because it says for CSV the >> connector.type should be filesystem not Kafka but documentation says it is >> supported. >> >> >> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; >> import org.apache.flink.runtime.state.StateBackend; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import org.apache.flink.table.api.DataTypes; >> import org.apache.flink.table.api.EnvironmentSettings; >> import org.apache.flink.table.api.Table; >> import org.apache.flink.table.api.java.StreamTableEnvironment; >> import org.apache.flink.table.descriptors.Csv; >> import org.apache.flink.table.descriptors.Kafka; >> import org.apache.flink.table.descriptors.Schema; >> >> public class Test { >> >> public static void main(String... args) throws Exception { >> EnvironmentSettings settings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> >> StreamExecutionEnvironment streamExecutionEnvironment = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> streamExecutionEnvironment.setStateBackend((StateBackend) new >> RocksDBStateBackend("file:///tmp/rocksdb")); >> StreamTableEnvironment tableEnvironment = >> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >> >> tableEnvironment >> .connect( >> new Kafka() >> .version("0.11") >> .topic("test-topic1") >> ) >> .withFormat(new Csv()) >> .withSchema(new Schema().field("f0", DataTypes.STRING())) >> .inAppendMode() >> .createTemporaryTable("kafka_source"); >> >> Table resultTable = tableEnvironment.sqlQuery("select * from >> kafka_source"); >> >> tableEnvironment >> .connect( >> new Kafka() >> .version("0.11") >> .topic("test-topic2") >> ) >> .withFormat(new Csv()) >> .withSchema(new Schema().field("f0", DataTypes.STRING())) >> .inAppendMode() >> .createTemporaryTable("kafka_target"); >> >> tableEnvironment.insertInto("kafka_target", resultTable); >> >> tableEnvironment.execute("Sample Job"); >> } >> } >> >> >> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> wrote: >> >>> Hi Benchao, >>> >>> Agreed a ConsoleSink is very useful but that is not the only problem >>> here. Documentation says use tableEnv.registerTableSink all over the place >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink >>> however that function is deprecated. so how do I add any other Sink? >>> >>> Thanks! >>> >>> >>> >>> >>> >>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> wrote: >>> >>>> Hi kant, >>>> >>>> AFAIK, there is no "print to stdout" sink for Table API now, you can >>>> implement one custom sink following this doc[1]. >>>> >>>> IMO, an out-of-box print table sink is very useful, and I've created an >>>> issue[2] to track this. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink >>>> [2] https://issues.apache.org/jira/browse/FLINK-16354 >>>> >>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: >>>> >>>>> Hi, >>>>> >>>>> Thanks for the pointer. Looks like the documentation says to use >>>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink >>>>> that >>>>> can print to stdout? what sink should I use to print to stdout and how do >>>>> I >>>>> add it without converting into DataStream? >>>>> >>>>> Thanks! >>>>> >>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. >>>>>> It’s not part of any public API. >>>>>> >>>>>> You don’t have to convert DataStream into Table to read from Kafka in >>>>>> Table API. I guess you could, if you had used DataStream API’s >>>>>> FlinkKafkaConsumer as it’s documented here [1]. >>>>>> >>>>>> But you should be able to use Kafka Table connector directly, as it >>>>>> is described in the docs [2][3]. >>>>>> >>>>>> Piotrek >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >>>>>> [2] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >>>>>> [3] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>>> >>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote: >>>>>> >>>>>> Also why do I need to convert to DataStream to print the rows of a >>>>>> table? Why not have a print method in the Table itself? >>>>>> >>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> Do I need to use DataStream API or Table API to construct sources? I >>>>>>> am just trying to read from Kafka and print it to console. And yes I >>>>>>> tried >>>>>>> it with datastreams and it works fine but I want to do it using Table >>>>>>> related APIs. I don't see any documentation or a sample on how to create >>>>>>> Kafka table source or any other source using Table Source API's so after >>>>>>> some digging I wrote the following code. My ultimate goal is to avoid >>>>>>> Datastream API as much as possible and just use Table API & SQL but >>>>>>> somehow >>>>>>> I feel the Flink framework focuses on DataStream than the SQL >>>>>>> interface. am >>>>>>> I wrong? From the user perspective wouldn't it make more sense to focus >>>>>>> on >>>>>>> SQL interfaces for both streaming and batch? >>>>>>> >>>>>>> >>>>>>> import >>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema; >>>>>>> import >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; >>>>>>> import org.apache.flink.table.api.DataTypes; >>>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>>> import org.apache.flink.table.api.Table; >>>>>>> import org.apache.flink.table.api.TableSchema; >>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>>>>> import org.apache.flink.table.sources.TableSource; >>>>>>> import org.apache.flink.types.Row; >>>>>>> >>>>>>> import java.io.IOException; >>>>>>> import java.util.Properties; >>>>>>> >>>>>>> public class Test { >>>>>>> >>>>>>> public class MyDeserializationSchema extends >>>>>>> AbstractDeserializationSchema<Row> { >>>>>>> @Override >>>>>>> public Row deserialize(byte[] message) throws IOException { >>>>>>> return Row.of(new String(message)); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> public static void main(String... args) throws Exception { >>>>>>> Test test = new Test(); >>>>>>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>>>>>> .useBlinkPlanner() >>>>>>> .inStreamingMode() >>>>>>> .build(); >>>>>>> >>>>>>> StreamExecutionEnvironment streamExecutionEnvironment = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>>>> >>>>>>> TableSource tableSource = test.getKafkaTableSource(); >>>>>>> Table kafkaTable = >>>>>>> tableEnvironment.fromTableSource(tableSource); >>>>>>> tableEnvironment.createTemporaryView("kafka_source", >>>>>>> kafkaTable); >>>>>>> >>>>>>> Table resultTable = tableEnvironment.sqlQuery("select * from >>>>>>> kafka_source"); >>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>> >>>>>>> streamExecutionEnvironment.execute("Sample Job"); >>>>>>> } >>>>>>> >>>>>>> public KafkaTableSource getKafkaTableSource() { >>>>>>> TableSchema tableSchema = TableSchema.builder().field("f0", >>>>>>> DataTypes.STRING()).build(); >>>>>>> Properties properties = new Properties(); >>>>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>>>> properties.setProperty("group.id", "test"); >>>>>>> return new KafkaTableSource(tableSchema, "test-topic1", >>>>>>> properties, new MyDeserializationSchema()); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> I get the following error >>>>>>> >>>>>>> The program finished with the following exception: >>>>>>> >>>>>>> The implementation of the FlinkKafkaConsumerBase is not >>>>>>> serializable. The object probably contains or references non >>>>>>> serializable >>>>>>> fields. >>>>>>> >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>> >>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>> >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>>>>>> >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>>>>>> >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>>>>>> >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>>>>>> >>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>>>>>> >>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>>>>>> >>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>>>>>> >>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>>>>> >>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>> >>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>>>>> >>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>>>>> >>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>>>>> >>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>> >>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>> >>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>>>> >>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>>>> >>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>>>> >>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>> >>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>>>> >>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>>>> >>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>>>> >>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>>>> >>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>>>>> >>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>>>>> Test.main(Test.java:40) >>>>>>> >>>>>>> The error seems to be on the line >>>>>>> >>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>> >>>>>>> and I am not sure why it is not able to serialize? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>> >>>>>> >>>> >>>> -- >>>> >>>> Benchao Li >>>> School of Electronics Engineering and Computer Science, Peking University >>>> Tel:+86-15650713730 >>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>> >>>> >> -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn