I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead of `flink-connector-kafka_2.11`.
Bests, Godfrey kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道: > The dependency was already there. Below is my build.gradle. Also I checked > the kafka version and looks like the jar > > flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" > > downloads kafka-clients version 2.2.0. So I changed my code to version > 2.2.0 and same problem persists. > > buildscript { > repositories { > jcenter() // this applies only to the Gradle 'Shadow' plugin > } > dependencies { > classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4' > } > } > > plugins { > id 'java' > id 'application' > } > > mainClassName = 'Test' > apply plugin: 'com.github.johnrengelman.shadow' > > ext { > javaVersion = '1.8' > flinkVersion = '1.10.0' > scalaBinaryVersion = '2.11' > slf4jVersion = '1.7.7' > log4jVersion = '1.2.17' > } > > > sourceCompatibility = javaVersion > targetCompatibility = javaVersion > tasks.withType(JavaCompile) { > options.encoding = 'UTF-8' > } > > applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"] > > // declare where to find the dependencies of your project > repositories { > mavenCentral() > maven { url > "https://repository.apache.org/content/repositories/snapshots/" } > } > > // NOTE: We cannot use "compileOnly" or "shadow" configurations since then we > could not run code > // in the IDE or with "gradle run". We also cannot exclude transitive > dependencies from the > // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159). > // -> Explicitly define the // libraries we want to be included in the > "flinkShadowJar" configuration! > configurations { > flinkShadowJar // dependencies which go into the shadowJar > > // always exclude these (also from transitive dependencies) since they > are provided by Flink > flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading' > flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305' > flinkShadowJar.exclude group: 'org.slf4j' > flinkShadowJar.exclude group: 'log4j' > } > > // declare the dependencies for your production and test code > dependencies { > // -------------------------------------------------------------- > // Compile-time dependencies that should NOT be part of the > // shadow jar and are provided in the lib folder of Flink > // -------------------------------------------------------------- > compile "org.apache.flink:flink-java:${flinkVersion}" > compile > "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" > > // -------------------------------------------------------------- > // Dependencies that should be part of the shadow jar, e.g. > // connectors. These must be in the flinkShadowJar configuration! > // -------------------------------------------------------------- > > compile "org.apache.flink:flink-java:${flinkVersion}" > compile > "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" > flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}" > > flinkShadowJar > "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" > flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" > compileOnly > "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" > compileOnly > "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" > flinkShadowJar > "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" > flinkShadowJar > "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" > > > compile "log4j:log4j:${log4jVersion}" > compile "org.slf4j:slf4j-log4j12:${slf4jVersion}" > > // Add test dependencies here. > // testCompile "junit:junit:4.12" > testImplementation > "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" > testImplementation > "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" > } > > // make compileOnly dependencies available for tests: > sourceSets { > main.compileClasspath += configurations.flinkShadowJar > main.runtimeClasspath += configurations.flinkShadowJar > > test.compileClasspath += configurations.flinkShadowJar > test.runtimeClasspath += configurations.flinkShadowJar > > javadoc.classpath += configurations.flinkShadowJar > } > > run.classpath = sourceSets.main.runtimeClasspath > > jar { > manifest { > attributes 'Built-By': System.getProperty('user.name'), > 'Build-Jdk': System.getProperty('java.version') > } > } > > shadowJar { > configurations = [project.configurations.flinkShadowJar] > } > > > > > On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> wrote: > >> hi kant, >> >> > Also why do I need to convert to DataStream to print the rows of a >> table? Why not have a print method in the Table itself? >> Flink 1.10 introduces a utility class named TableUtils to convert a Table >> to List<Row>, this utility class is mainly used for demonstration or >> testing and is only applicable for *small batch jobs* and small finite >> *append >> only stream jobs*. code like: >> Table table = tEnv.sqlQuery("select ..."); >> List<Row> result = TableUtils.collectToList(table); >> result..... >> >> currently, we are planner to implement Table#collect[1], after >> that Table#head and Table#print may be also introduced soon. >> >> > The program finished with the following exception: >> please make sure that the kafka version in Test class and the kafka >> version in pom dependency are same. I tested your code successfully. >> >> Bests, >> Godfrey >> >> [1] https://issues.apache.org/jira/browse/FLINK-14807 >> >> >> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道: >> >>> Hi kant, >>> >>> CSV format is an independent module, you need to add it as your >>> dependency. >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-csv</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> >>> >>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道: >>> >>>> ------------------------------------------------------------ >>>> The program finished with the following exception: >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>> method caused an error: findAndCreateTableSource failed. >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>> at >>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>>> at >>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >>>> Caused by: org.apache.flink.table.api.TableException: >>>> findAndCreateTableSource failed. >>>> at >>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >>>> at >>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) >>>> at >>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) >>>> at >>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) >>>> at >>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) >>>> at >>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>>> at >>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>>> at >>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>>> at >>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>>> at >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) >>>> at Test.main(Test.java:34) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>> ... 8 more >>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>>> Could not find a suitable table factory for >>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>> the classpath. >>>> >>>> Reason: Required context properties mismatch. >>>> >>>> The matching candidates: >>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>> Mismatched properties: >>>> 'connector.type' expects 'filesystem', but is 'kafka' >>>> >>>> The following properties are requested: >>>> connector.property-version=1 >>>> connector.topic=test-topic1 >>>> connector.type=kafka >>>> connector.version=0.11 >>>> format.property-version=1 >>>> format.type=csv >>>> schema.0.data-type=VARCHAR(2147483647) >>>> schema.0.name=f0 >>>> update-mode=append >>>> >>>> The following factories have been considered: >>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>> at >>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >>>> at >>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >>>> at >>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>>> at >>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>>> at >>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >>>> ... 34 more >>>> >>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> > Thanks for the pointer. Looks like the documentation says to use >>>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>>> deprecated in Flink 1.10. >>>>> >>>>> It looks like not all of the documentation was updated after methods >>>>> were deprecated. However if you look at the java docs of the >>>>> `registerTableSink` method, you can find an answer [1]. >>>>> >>>>> > It sill doesnt work because it says for CSV the connector.type >>>>> should be filesystem not Kafka. >>>>> >>>>> Can you post the full stack trace? As I’m not familiar with the Table >>>>> API, maybe you Timo or Dawid know what’s going on here? >>>>> >>>>> Piotrek >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink- >>>>> >>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote: >>>>> >>>>> Here is my updated code after digging through the source code (not >>>>> sure if it is correct ). It sill doesnt work because it says for CSV the >>>>> connector.type should be filesystem not Kafka but documentation says it is >>>>> supported. >>>>> >>>>> >>>>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; >>>>> import org.apache.flink.runtime.state.StateBackend; >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>> import org.apache.flink.table.api.DataTypes; >>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>> import org.apache.flink.table.api.Table; >>>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>>> import org.apache.flink.table.descriptors.Csv; >>>>> import org.apache.flink.table.descriptors.Kafka; >>>>> import org.apache.flink.table.descriptors.Schema; >>>>> >>>>> public class Test { >>>>> >>>>> public static void main(String... args) throws Exception { >>>>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>>>> .useBlinkPlanner() >>>>> .inStreamingMode() >>>>> .build(); >>>>> >>>>> StreamExecutionEnvironment streamExecutionEnvironment = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> streamExecutionEnvironment.setStateBackend((StateBackend) new >>>>> RocksDBStateBackend("file:///tmp/rocksdb")); >>>>> StreamTableEnvironment tableEnvironment = >>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>> >>>>> tableEnvironment >>>>> .connect( >>>>> new Kafka() >>>>> .version("0.11") >>>>> .topic("test-topic1") >>>>> ) >>>>> .withFormat(new Csv()) >>>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>>> .inAppendMode() >>>>> .createTemporaryTable("kafka_source"); >>>>> >>>>> Table resultTable = tableEnvironment.sqlQuery("select * from >>>>> kafka_source"); >>>>> >>>>> tableEnvironment >>>>> .connect( >>>>> new Kafka() >>>>> .version("0.11") >>>>> .topic("test-topic2") >>>>> ) >>>>> .withFormat(new Csv()) >>>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>>> .inAppendMode() >>>>> .createTemporaryTable("kafka_target"); >>>>> >>>>> tableEnvironment.insertInto("kafka_target", resultTable); >>>>> >>>>> tableEnvironment.execute("Sample Job"); >>>>> } >>>>> } >>>>> >>>>> >>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Benchao, >>>>>> >>>>>> Agreed a ConsoleSink is very useful but that is not the only problem >>>>>> here. Documentation says use tableEnv.registerTableSink all over the >>>>>> place >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink >>>>>> however that function is deprecated. so how do I add any other Sink? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi kant, >>>>>>> >>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you can >>>>>>> implement one custom sink following this doc[1]. >>>>>>> >>>>>>> IMO, an out-of-box print table sink is very useful, and I've created >>>>>>> an issue[2] to track this. >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink >>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354 >>>>>>> >>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thanks for the pointer. Looks like the documentation says to use >>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink >>>>>>>> that >>>>>>>> can print to stdout? what sink should I use to print to stdout and how >>>>>>>> do I >>>>>>>> add it without converting into DataStream? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked >>>>>>>>> @Internal. It’s not part of any public API. >>>>>>>>> >>>>>>>>> You don’t have to convert DataStream into Table to read from Kafka >>>>>>>>> in Table API. I guess you could, if you had used DataStream API’s >>>>>>>>> FlinkKafkaConsumer as it’s documented here [1]. >>>>>>>>> >>>>>>>>> But you should be able to use Kafka Table connector directly, as >>>>>>>>> it is described in the docs [2][3]. >>>>>>>>> >>>>>>>>> Piotrek >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >>>>>>>>> [2] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >>>>>>>>> [3] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>>>>>> >>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Also why do I need to convert to DataStream to print the rows of a >>>>>>>>> table? Why not have a print method in the Table itself? >>>>>>>>> >>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> Do I need to use DataStream API or Table API to construct >>>>>>>>>> sources? I am just trying to read from Kafka and print it to >>>>>>>>>> console. And >>>>>>>>>> yes I tried it with datastreams and it works fine but I want to do >>>>>>>>>> it using >>>>>>>>>> Table related APIs. I don't see any documentation or a sample on how >>>>>>>>>> to >>>>>>>>>> create Kafka table source or any other source using Table Source >>>>>>>>>> API's so >>>>>>>>>> after some digging I wrote the following code. My ultimate goal is >>>>>>>>>> to avoid >>>>>>>>>> Datastream API as much as possible and just use Table API & SQL but >>>>>>>>>> somehow >>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL >>>>>>>>>> interface. am >>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense to >>>>>>>>>> focus on >>>>>>>>>> SQL interfaces for both streaming and batch? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> import >>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema; >>>>>>>>>> import >>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; >>>>>>>>>> import org.apache.flink.table.api.DataTypes; >>>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>>>>>> import org.apache.flink.table.api.Table; >>>>>>>>>> import org.apache.flink.table.api.TableSchema; >>>>>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>>>>>>>> import org.apache.flink.table.sources.TableSource; >>>>>>>>>> import org.apache.flink.types.Row; >>>>>>>>>> >>>>>>>>>> import java.io.IOException; >>>>>>>>>> import java.util.Properties; >>>>>>>>>> >>>>>>>>>> public class Test { >>>>>>>>>> >>>>>>>>>> public class MyDeserializationSchema extends >>>>>>>>>> AbstractDeserializationSchema<Row> { >>>>>>>>>> @Override >>>>>>>>>> public Row deserialize(byte[] message) throws IOException { >>>>>>>>>> return Row.of(new String(message)); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> public static void main(String... args) throws Exception { >>>>>>>>>> Test test = new Test(); >>>>>>>>>> EnvironmentSettings settings = >>>>>>>>>> EnvironmentSettings.newInstance() >>>>>>>>>> .useBlinkPlanner() >>>>>>>>>> .inStreamingMode() >>>>>>>>>> .build(); >>>>>>>>>> >>>>>>>>>> StreamExecutionEnvironment streamExecutionEnvironment = >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>>>>>>> >>>>>>>>>> TableSource tableSource = test.getKafkaTableSource(); >>>>>>>>>> Table kafkaTable = >>>>>>>>>> tableEnvironment.fromTableSource(tableSource); >>>>>>>>>> tableEnvironment.createTemporaryView("kafka_source", >>>>>>>>>> kafkaTable); >>>>>>>>>> >>>>>>>>>> Table resultTable = tableEnvironment.sqlQuery("select * from >>>>>>>>>> kafka_source"); >>>>>>>>>> tableEnvironment.toAppendStream(resultTable, >>>>>>>>>> Row.class).print(); >>>>>>>>>> >>>>>>>>>> streamExecutionEnvironment.execute("Sample Job"); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> public KafkaTableSource getKafkaTableSource() { >>>>>>>>>> TableSchema tableSchema = TableSchema.builder().field("f0", >>>>>>>>>> DataTypes.STRING()).build(); >>>>>>>>>> Properties properties = new Properties(); >>>>>>>>>> properties.setProperty("bootstrap.servers", >>>>>>>>>> "localhost:9092"); >>>>>>>>>> properties.setProperty("group.id", "test"); >>>>>>>>>> return new KafkaTableSource(tableSchema, "test-topic1", >>>>>>>>>> properties, new MyDeserializationSchema()); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I get the following error >>>>>>>>>> >>>>>>>>>> The program finished with the following exception: >>>>>>>>>> >>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not >>>>>>>>>> serializable. The object probably contains or references non >>>>>>>>>> serializable >>>>>>>>>> fields. >>>>>>>>>> >>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>>>>> >>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>>>>> >>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>>>>>>>>> >>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>>>>>>>>> >>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>>>>>>>>> >>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>>>>>>>>> >>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>>>>>>> >>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>>> >>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>>>> >>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>>>>>>>> >>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>>>>>>>> Test.main(Test.java:40) >>>>>>>>>> >>>>>>>>>> The error seems to be on the line >>>>>>>>>> >>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>>>>> >>>>>>>>>> and I am not sure why it is not able to serialize? >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Benchao Li >>>>>>> School of Electronics Engineering and Computer Science, Peking >>>>>>> University >>>>>>> Tel:+86-15650713730 >>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>>>>> >>>>>>> >>>>> >>> >>> -- >>> >>> Benchao Li >>> School of Electronics Engineering and Computer Science, Peking University >>> Tel:+86-15650713730 >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>> >>>