------------------------------------------------------------ The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) at Test.main(Test.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 8 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.property-version=1 connector.topic=test-topic1 connector.type=kafka connector.version=0.11 format.property-version=1 format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=f0 update-mode=append The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) ... 34 more On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com> wrote: > Hi, > > > Thanks for the pointer. Looks like the documentation says to use > tableEnv.registerTableSink however in my IDE it shows the method is > deprecated in Flink 1.10. > > It looks like not all of the documentation was updated after methods were > deprecated. However if you look at the java docs of the `registerTableSink` > method, you can find an answer [1]. > > > It sill doesnt work because it says for CSV the connector.type should > be filesystem not Kafka. > > Can you post the full stack trace? As I’m not familiar with the Table API, > maybe you Timo or Dawid know what’s going on here? > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink- > > On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote: > > Here is my updated code after digging through the source code (not sure if > it is correct ). It sill doesnt work because it says for CSV the > connector.type should be filesystem not Kafka but documentation says it is > supported. > > > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.table.descriptors.Csv; > import org.apache.flink.table.descriptors.Kafka; > import org.apache.flink.table.descriptors.Schema; > > public class Test { > > public static void main(String... args) throws Exception { > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > > StreamExecutionEnvironment streamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment(); > streamExecutionEnvironment.setStateBackend((StateBackend) new > RocksDBStateBackend("file:///tmp/rocksdb")); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(streamExecutionEnvironment, settings); > > tableEnvironment > .connect( > new Kafka() > .version("0.11") > .topic("test-topic1") > ) > .withFormat(new Csv()) > .withSchema(new Schema().field("f0", DataTypes.STRING())) > .inAppendMode() > .createTemporaryTable("kafka_source"); > > Table resultTable = tableEnvironment.sqlQuery("select * from > kafka_source"); > > tableEnvironment > .connect( > new Kafka() > .version("0.11") > .topic("test-topic2") > ) > .withFormat(new Csv()) > .withSchema(new Schema().field("f0", DataTypes.STRING())) > .inAppendMode() > .createTemporaryTable("kafka_target"); > > tableEnvironment.insertInto("kafka_target", resultTable); > > tableEnvironment.execute("Sample Job"); > } > } > > > On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> wrote: > >> Hi Benchao, >> >> Agreed a ConsoleSink is very useful but that is not the only problem >> here. Documentation says use tableEnv.registerTableSink all over the place >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink >> however that function is deprecated. so how do I add any other Sink? >> >> Thanks! >> >> >> >> >> >> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> wrote: >> >>> Hi kant, >>> >>> AFAIK, there is no "print to stdout" sink for Table API now, you can >>> implement one custom sink following this doc[1]. >>> >>> IMO, an out-of-box print table sink is very useful, and I've created an >>> issue[2] to track this. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink >>> [2] https://issues.apache.org/jira/browse/FLINK-16354 >>> >>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: >>> >>>> Hi, >>>> >>>> Thanks for the pointer. Looks like the documentation says to use >>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that >>>> can print to stdout? what sink should I use to print to stdout and how do I >>>> add it without converting into DataStream? >>>> >>>> Thanks! >>>> >>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. >>>>> It’s not part of any public API. >>>>> >>>>> You don’t have to convert DataStream into Table to read from Kafka in >>>>> Table API. I guess you could, if you had used DataStream API’s >>>>> FlinkKafkaConsumer as it’s documented here [1]. >>>>> >>>>> But you should be able to use Kafka Table connector directly, as it is >>>>> described in the docs [2][3]. >>>>> >>>>> Piotrek >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >>>>> [3] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>> >>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote: >>>>> >>>>> Also why do I need to convert to DataStream to print the rows of a >>>>> table? Why not have a print method in the Table itself? >>>>> >>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> Do I need to use DataStream API or Table API to construct sources? I >>>>>> am just trying to read from Kafka and print it to console. And yes I >>>>>> tried >>>>>> it with datastreams and it works fine but I want to do it using Table >>>>>> related APIs. I don't see any documentation or a sample on how to create >>>>>> Kafka table source or any other source using Table Source API's so after >>>>>> some digging I wrote the following code. My ultimate goal is to avoid >>>>>> Datastream API as much as possible and just use Table API & SQL but >>>>>> somehow >>>>>> I feel the Flink framework focuses on DataStream than the SQL interface. >>>>>> am >>>>>> I wrong? From the user perspective wouldn't it make more sense to focus >>>>>> on >>>>>> SQL interfaces for both streaming and batch? >>>>>> >>>>>> >>>>>> import >>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema; >>>>>> import >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; >>>>>> import org.apache.flink.table.api.DataTypes; >>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>> import org.apache.flink.table.api.Table; >>>>>> import org.apache.flink.table.api.TableSchema; >>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>>>> import org.apache.flink.table.sources.TableSource; >>>>>> import org.apache.flink.types.Row; >>>>>> >>>>>> import java.io.IOException; >>>>>> import java.util.Properties; >>>>>> >>>>>> public class Test { >>>>>> >>>>>> public class MyDeserializationSchema extends >>>>>> AbstractDeserializationSchema<Row> { >>>>>> @Override >>>>>> public Row deserialize(byte[] message) throws IOException { >>>>>> return Row.of(new String(message)); >>>>>> } >>>>>> } >>>>>> >>>>>> public static void main(String... args) throws Exception { >>>>>> Test test = new Test(); >>>>>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>>>>> .useBlinkPlanner() >>>>>> .inStreamingMode() >>>>>> .build(); >>>>>> >>>>>> StreamExecutionEnvironment streamExecutionEnvironment = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> StreamTableEnvironment tableEnvironment = >>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>>> >>>>>> TableSource tableSource = test.getKafkaTableSource(); >>>>>> Table kafkaTable = tableEnvironment.fromTableSource(tableSource); >>>>>> tableEnvironment.createTemporaryView("kafka_source", kafkaTable); >>>>>> >>>>>> Table resultTable = tableEnvironment.sqlQuery("select * from >>>>>> kafka_source"); >>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>> >>>>>> streamExecutionEnvironment.execute("Sample Job"); >>>>>> } >>>>>> >>>>>> public KafkaTableSource getKafkaTableSource() { >>>>>> TableSchema tableSchema = TableSchema.builder().field("f0", >>>>>> DataTypes.STRING()).build(); >>>>>> Properties properties = new Properties(); >>>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>>> properties.setProperty("group.id", "test"); >>>>>> return new KafkaTableSource(tableSchema, "test-topic1", >>>>>> properties, new MyDeserializationSchema()); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> I get the following error >>>>>> >>>>>> The program finished with the following exception: >>>>>> >>>>>> The implementation of the FlinkKafkaConsumerBase is not serializable. >>>>>> The object probably contains or references non serializable fields. >>>>>> >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>> >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>>>>> >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>>>>> >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>>>>> >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>>>>> >>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>>>>> >>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>>>>> >>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>>>>> >>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>>>> >>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>> >>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>>>> >>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>>>> >>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>>>> >>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>> >>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>> >>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>>> >>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>>> >>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>>> >>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>> >>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>>> >>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>>> >>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>>> >>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>>> >>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>>>> >>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>>>> Test.main(Test.java:40) >>>>>> >>>>>> The error seems to be on the line >>>>>> >>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>> >>>>>> and I am not sure why it is not able to serialize? >>>>>> >>>>>> Thanks! >>>>>> >>>>> >>>>> >>> >>> -- >>> >>> Benchao Li >>> School of Electronics Engineering and Computer Science, Peking University >>> Tel:+86-15650713730 >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>> >>> >