I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead of
`flink-connector-kafka_2.11`.

Bests,
Godfrey

kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道:

> The dependency was already there. Below is my build.gradle. Also I checked
> the kafka version and looks like the jar
>
> flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>
> downloads kafka-clients version 2.2.0. So I changed my code to version
> 2.2.0 and same problem persists.
>
> buildscript {
>     repositories {
>         jcenter() // this applies only to the Gradle 'Shadow' plugin
>     }
>     dependencies {
>         classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
>     }
> }
>
> plugins {
>     id 'java'
>     id 'application'
> }
>
> mainClassName = 'Test'
> apply plugin: 'com.github.johnrengelman.shadow'
>
> ext {
>     javaVersion = '1.8'
>     flinkVersion = '1.10.0'
>     scalaBinaryVersion = '2.11'
>     slf4jVersion = '1.7.7'
>     log4jVersion = '1.2.17'
> }
>
>
> sourceCompatibility = javaVersion
> targetCompatibility = javaVersion
> tasks.withType(JavaCompile) {
>     options.encoding = 'UTF-8'
> }
>
> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
>
> // declare where to find the dependencies of your project
> repositories {
>     mavenCentral()
>     maven { url 
> "https://repository.apache.org/content/repositories/snapshots/"; }
> }
>
> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then we 
> could not run code
> // in the IDE or with "gradle run". We also cannot exclude transitive 
> dependencies from the
> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
> // -> Explicitly define the // libraries we want to be included in the 
> "flinkShadowJar" configuration!
> configurations {
>     flinkShadowJar // dependencies which go into the shadowJar
>
>     // always exclude these (also from transitive dependencies) since they 
> are provided by Flink
>     flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
>     flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
>     flinkShadowJar.exclude group: 'org.slf4j'
>     flinkShadowJar.exclude group: 'log4j'
> }
>
> // declare the dependencies for your production and test code
> dependencies {
>     // --------------------------------------------------------------
>     // Compile-time dependencies that should NOT be part of the
>     // shadow jar and are provided in the lib folder of Flink
>     // --------------------------------------------------------------
>     compile "org.apache.flink:flink-java:${flinkVersion}"
>     compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>
>     // --------------------------------------------------------------
>     // Dependencies that should be part of the shadow jar, e.g.
>     // connectors. These must be in the flinkShadowJar configuration!
>     // --------------------------------------------------------------
>
>     compile "org.apache.flink:flink-java:${flinkVersion}"
>     compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>     flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"
>
>     flinkShadowJar 
> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>     flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
>     compileOnly 
> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
>     compileOnly 
> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
>     flinkShadowJar 
> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
>     flinkShadowJar 
> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"
>
>
>     compile "log4j:log4j:${log4jVersion}"
>     compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>
>     // Add test dependencies here.
>     // testCompile "junit:junit:4.12"
>     testImplementation 
> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
>     testImplementation 
> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
> }
>
> // make compileOnly dependencies available for tests:
> sourceSets {
>     main.compileClasspath += configurations.flinkShadowJar
>     main.runtimeClasspath += configurations.flinkShadowJar
>
>     test.compileClasspath += configurations.flinkShadowJar
>     test.runtimeClasspath += configurations.flinkShadowJar
>
>     javadoc.classpath += configurations.flinkShadowJar
> }
>
> run.classpath = sourceSets.main.runtimeClasspath
>
> jar {
>     manifest {
>         attributes 'Built-By': System.getProperty('user.name'),
>                 'Build-Jdk': System.getProperty('java.version')
>     }
> }
>
> shadowJar {
>     configurations = [project.configurations.flinkShadowJar]
> }
>
>
>
>
> On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> wrote:
>
>> hi kant,
>>
>> > Also why do I need to convert to DataStream to print the rows of a
>> table? Why not have a print method in the Table itself?
>> Flink 1.10 introduces a utility class named TableUtils to convert a Table
>> to List<Row>, this utility class is mainly used for demonstration or
>> testing and is only applicable for *small batch jobs* and small finite 
>> *append
>> only stream jobs*.  code like:
>> Table table = tEnv.sqlQuery("select ...");
>> List<Row> result = TableUtils.collectToList(table);
>> result.....
>>
>> currently, we are planner to implement Table#collect[1], after
>> that Table#head and Table#print may be also introduced soon.
>>
>> >  The program finished with the following exception:
>> please make sure that the kafka version in Test class and the kafka
>> version in pom dependency are same. I tested your code successfully.
>>
>> Bests,
>> Godfrey
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-14807
>>
>>
>> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道:
>>
>>> Hi kant,
>>>
>>> CSV format is an independent module, you need to add it as your
>>> dependency.
>>>
>>> <dependency>
>>>    <groupId>org.apache.flink</groupId>
>>>    <artifactId>flink-csv</artifactId>
>>>    <version>${flink.version}</version>
>>> </dependency>
>>>
>>>
>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道:
>>>
>>>> ------------------------------------------------------------
>>>>  The program finished with the following exception:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error: findAndCreateTableSource failed.
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>> at
>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>> at
>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>> at
>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>>> at
>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>>> at
>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>>> Caused by: org.apache.flink.table.api.TableException:
>>>> findAndCreateTableSource failed.
>>>> at
>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>>> at
>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>>>> at
>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>>>> at
>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>>>> at
>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>>>> at
>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>>> at
>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>>>> at Test.main(Test.java:34)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>> ... 8 more
>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>>> Could not find a suitable table factory for
>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>> the classpath.
>>>>
>>>> Reason: Required context properties mismatch.
>>>>
>>>> The matching candidates:
>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>> Mismatched properties:
>>>> 'connector.type' expects 'filesystem', but is 'kafka'
>>>>
>>>> The following properties are requested:
>>>> connector.property-version=1
>>>> connector.topic=test-topic1
>>>> connector.type=kafka
>>>> connector.version=0.11
>>>> format.property-version=1
>>>> format.type=csv
>>>> schema.0.data-type=VARCHAR(2147483647)
>>>> schema.0.name=f0
>>>> update-mode=append
>>>>
>>>> The following factories have been considered:
>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>> at
>>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>>>> at
>>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>>>> at
>>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>>> at
>>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>>> at
>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
>>>> ... 34 more
>>>>
>>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> > Thanks for the pointer. Looks like the documentation says to use
>>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>>> deprecated in Flink 1.10.
>>>>>
>>>>> It looks like not all of the documentation was updated after methods
>>>>> were deprecated. However if you look at the java docs of the
>>>>> `registerTableSink` method, you can find an answer [1].
>>>>>
>>>>> >  It sill doesnt work because it says for CSV the connector.type
>>>>> should be filesystem not Kafka.
>>>>>
>>>>> Can you post the full stack trace? As I’m not familiar with the Table
>>>>> API, maybe you Timo or Dawid know what’s going on here?
>>>>>
>>>>> Piotrek
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
>>>>>
>>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote:
>>>>>
>>>>> Here is my updated code after digging through the source code (not
>>>>> sure if it is correct ). It sill doesnt work because it says for CSV the
>>>>> connector.type should be filesystem not Kafka but documentation says it is
>>>>> supported.
>>>>>
>>>>>
>>>>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
>>>>> import org.apache.flink.runtime.state.StateBackend;
>>>>> import 
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import org.apache.flink.table.api.DataTypes;
>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>> import org.apache.flink.table.api.Table;
>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>>> import org.apache.flink.table.descriptors.Csv;
>>>>> import org.apache.flink.table.descriptors.Kafka;
>>>>> import org.apache.flink.table.descriptors.Schema;
>>>>>
>>>>> public class Test {
>>>>>
>>>>>     public static void main(String... args) throws Exception {
>>>>>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>>>                 .useBlinkPlanner()
>>>>>                 .inStreamingMode()
>>>>>                 .build();
>>>>>
>>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>         streamExecutionEnvironment.setStateBackend((StateBackend) new 
>>>>> RocksDBStateBackend("file:///tmp/rocksdb"));
>>>>>         StreamTableEnvironment tableEnvironment = 
>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>>>
>>>>>         tableEnvironment
>>>>>             .connect(
>>>>>                 new Kafka()
>>>>>                     .version("0.11")
>>>>>                     .topic("test-topic1")
>>>>>             )
>>>>>             .withFormat(new Csv())
>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>             .inAppendMode()
>>>>>             .createTemporaryTable("kafka_source");
>>>>>
>>>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>>>> kafka_source");
>>>>>
>>>>>         tableEnvironment
>>>>>             .connect(
>>>>>                 new Kafka()
>>>>>                     .version("0.11")
>>>>>                     .topic("test-topic2")
>>>>>             )
>>>>>             .withFormat(new Csv())
>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>             .inAppendMode()
>>>>>             .createTemporaryTable("kafka_target");
>>>>>
>>>>>         tableEnvironment.insertInto("kafka_target", resultTable);
>>>>>
>>>>>         tableEnvironment.execute("Sample Job");
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Benchao,
>>>>>>
>>>>>> Agreed a ConsoleSink is very useful but that is not the only problem
>>>>>> here. Documentation says use  tableEnv.registerTableSink all over the 
>>>>>> place
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>>>>>> however that function is deprecated. so how do I add any other Sink?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi kant,
>>>>>>>
>>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you can
>>>>>>> implement one custom sink following this doc[1].
>>>>>>>
>>>>>>> IMO, an out-of-box print table sink is very useful, and I've created
>>>>>>> an issue[2] to track this.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>>>>>>
>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thanks for the pointer. Looks like the documentation says to use
>>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink 
>>>>>>>> that
>>>>>>>> can print to stdout? what sink should I use to print to stdout and how 
>>>>>>>> do I
>>>>>>>> add it without converting into DataStream?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked
>>>>>>>>> @Internal. It’s not part of any public API.
>>>>>>>>>
>>>>>>>>> You don’t have to convert DataStream into Table to read from Kafka
>>>>>>>>> in Table API. I guess you could, if you had used DataStream API’s
>>>>>>>>> FlinkKafkaConsumer as it’s documented here [1].
>>>>>>>>>
>>>>>>>>> But you should be able to use Kafka Table connector directly, as
>>>>>>>>> it is described in the docs [2][3].
>>>>>>>>>
>>>>>>>>> Piotrek
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>>>>>>>> [3]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>>>>>
>>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Also why do I need to convert to DataStream to print the rows of a
>>>>>>>>> table? Why not have a print method in the Table itself?
>>>>>>>>>
>>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> Do I need to use DataStream API or Table API to construct
>>>>>>>>>> sources? I am just trying to read from Kafka and print it to 
>>>>>>>>>> console. And
>>>>>>>>>> yes I tried it with datastreams and it works fine but I want to do 
>>>>>>>>>> it using
>>>>>>>>>> Table related APIs. I don't see any documentation or a sample on how 
>>>>>>>>>> to
>>>>>>>>>> create Kafka table source or any other source using Table Source 
>>>>>>>>>> API's so
>>>>>>>>>> after some digging I wrote the following code. My ultimate goal is 
>>>>>>>>>> to avoid
>>>>>>>>>> Datastream API as much as possible and just use Table API & SQL but 
>>>>>>>>>> somehow
>>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL 
>>>>>>>>>> interface. am
>>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense to 
>>>>>>>>>> focus on
>>>>>>>>>> SQL interfaces for both streaming and batch?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> import 
>>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>>>>>>>>>> import 
>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>>>>>>>>>> import org.apache.flink.table.api.DataTypes;
>>>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>>>>>> import org.apache.flink.table.api.Table;
>>>>>>>>>> import org.apache.flink.table.api.TableSchema;
>>>>>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>>>>>>>> import org.apache.flink.table.sources.TableSource;
>>>>>>>>>> import org.apache.flink.types.Row;
>>>>>>>>>>
>>>>>>>>>> import java.io.IOException;
>>>>>>>>>> import java.util.Properties;
>>>>>>>>>>
>>>>>>>>>> public class Test {
>>>>>>>>>>
>>>>>>>>>>     public class MyDeserializationSchema extends 
>>>>>>>>>> AbstractDeserializationSchema<Row> {
>>>>>>>>>>         @Override
>>>>>>>>>>         public Row deserialize(byte[] message) throws IOException {
>>>>>>>>>>             return Row.of(new String(message));
>>>>>>>>>>         }
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>>>         Test test = new Test();
>>>>>>>>>>         EnvironmentSettings settings = 
>>>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>>>                 .inStreamingMode()
>>>>>>>>>>                 .build();
>>>>>>>>>>
>>>>>>>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>         StreamTableEnvironment tableEnvironment = 
>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>>>>>>>>
>>>>>>>>>>         TableSource tableSource = test.getKafkaTableSource();
>>>>>>>>>>         Table kafkaTable = 
>>>>>>>>>> tableEnvironment.fromTableSource(tableSource);
>>>>>>>>>>         tableEnvironment.createTemporaryView("kafka_source", 
>>>>>>>>>> kafkaTable);
>>>>>>>>>>
>>>>>>>>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>>>>>>>>> kafka_source");
>>>>>>>>>>         tableEnvironment.toAppendStream(resultTable, 
>>>>>>>>>> Row.class).print();
>>>>>>>>>>
>>>>>>>>>>         streamExecutionEnvironment.execute("Sample Job");
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     public KafkaTableSource getKafkaTableSource() {
>>>>>>>>>>         TableSchema tableSchema = TableSchema.builder().field("f0", 
>>>>>>>>>> DataTypes.STRING()).build();
>>>>>>>>>>         Properties properties = new Properties();
>>>>>>>>>>         properties.setProperty("bootstrap.servers", 
>>>>>>>>>> "localhost:9092");
>>>>>>>>>>         properties.setProperty("group.id", "test");
>>>>>>>>>>         return new KafkaTableSource(tableSchema, "test-topic1", 
>>>>>>>>>> properties, new MyDeserializationSchema());
>>>>>>>>>>     }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I get the following error
>>>>>>>>>>
>>>>>>>>>> The program finished with the following exception:
>>>>>>>>>>
>>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not
>>>>>>>>>> serializable. The object probably contains or references non 
>>>>>>>>>> serializable
>>>>>>>>>> fields.
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>>>>>>>>
>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>
>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>>>
>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>>>>>>>> Test.main(Test.java:40)
>>>>>>>>>>
>>>>>>>>>> The error seems to be on the line
>>>>>>>>>>
>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>>>>>>>
>>>>>>>>>> and I am not sure why it is not able to serialize?
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Benchao Li
>>>>>>> School of Electronics Engineering and Computer Science, Peking 
>>>>>>> University
>>>>>>> Tel:+86-15650713730
>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>>>
>>>>>>>
>>>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>

Reply via email to