I don't know how gradle works, but in Maven, packaging dependencies into one fat jar needs to specify how SPI property files should be dealt with, like
<transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> Could you check that your final jar contains correct resource file? godfrey he <godfre...@gmail.com> 于2020年3月1日周日 下午5:25写道: > I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead of > `flink-connector-kafka_2.11`. > > Bests, > Godfrey > > kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道: > >> The dependency was already there. Below is my build.gradle. Also I >> checked the kafka version and looks like the jar >> >> flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" >> >> downloads kafka-clients version 2.2.0. So I changed my code to version >> 2.2.0 and same problem persists. >> >> buildscript { >> repositories { >> jcenter() // this applies only to the Gradle 'Shadow' plugin >> } >> dependencies { >> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4' >> } >> } >> >> plugins { >> id 'java' >> id 'application' >> } >> >> mainClassName = 'Test' >> apply plugin: 'com.github.johnrengelman.shadow' >> >> ext { >> javaVersion = '1.8' >> flinkVersion = '1.10.0' >> scalaBinaryVersion = '2.11' >> slf4jVersion = '1.7.7' >> log4jVersion = '1.2.17' >> } >> >> >> sourceCompatibility = javaVersion >> targetCompatibility = javaVersion >> tasks.withType(JavaCompile) { >> options.encoding = 'UTF-8' >> } >> >> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"] >> >> // declare where to find the dependencies of your project >> repositories { >> mavenCentral() >> maven { url >> "https://repository.apache.org/content/repositories/snapshots/" } >> } >> >> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then >> we could not run code >> // in the IDE or with "gradle run". We also cannot exclude transitive >> dependencies from the >> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159). >> // -> Explicitly define the // libraries we want to be included in the >> "flinkShadowJar" configuration! >> configurations { >> flinkShadowJar // dependencies which go into the shadowJar >> >> // always exclude these (also from transitive dependencies) since they >> are provided by Flink >> flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading' >> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: >> 'jsr305' >> flinkShadowJar.exclude group: 'org.slf4j' >> flinkShadowJar.exclude group: 'log4j' >> } >> >> // declare the dependencies for your production and test code >> dependencies { >> // -------------------------------------------------------------- >> // Compile-time dependencies that should NOT be part of the >> // shadow jar and are provided in the lib folder of Flink >> // -------------------------------------------------------------- >> compile "org.apache.flink:flink-java:${flinkVersion}" >> compile >> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >> >> // -------------------------------------------------------------- >> // Dependencies that should be part of the shadow jar, e.g. >> // connectors. These must be in the flinkShadowJar configuration! >> // -------------------------------------------------------------- >> >> compile "org.apache.flink:flink-java:${flinkVersion}" >> compile >> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >> flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}" >> >> flinkShadowJar >> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" >> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" >> compileOnly >> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >> compileOnly >> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" >> flinkShadowJar >> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" >> flinkShadowJar >> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" >> >> >> compile "log4j:log4j:${log4jVersion}" >> compile "org.slf4j:slf4j-log4j12:${slf4jVersion}" >> >> // Add test dependencies here. >> // testCompile "junit:junit:4.12" >> testImplementation >> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >> testImplementation >> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" >> } >> >> // make compileOnly dependencies available for tests: >> sourceSets { >> main.compileClasspath += configurations.flinkShadowJar >> main.runtimeClasspath += configurations.flinkShadowJar >> >> test.compileClasspath += configurations.flinkShadowJar >> test.runtimeClasspath += configurations.flinkShadowJar >> >> javadoc.classpath += configurations.flinkShadowJar >> } >> >> run.classpath = sourceSets.main.runtimeClasspath >> >> jar { >> manifest { >> attributes 'Built-By': System.getProperty('user.name'), >> 'Build-Jdk': System.getProperty('java.version') >> } >> } >> >> shadowJar { >> configurations = [project.configurations.flinkShadowJar] >> } >> >> >> >> >> On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> wrote: >> >>> hi kant, >>> >>> > Also why do I need to convert to DataStream to print the rows of a >>> table? Why not have a print method in the Table itself? >>> Flink 1.10 introduces a utility class named TableUtils to convert a >>> Table to List<Row>, this utility class is mainly used for demonstration or >>> testing and is only applicable for *small batch jobs* and small finite >>> *append >>> only stream jobs*. code like: >>> Table table = tEnv.sqlQuery("select ..."); >>> List<Row> result = TableUtils.collectToList(table); >>> result..... >>> >>> currently, we are planner to implement Table#collect[1], after >>> that Table#head and Table#print may be also introduced soon. >>> >>> > The program finished with the following exception: >>> please make sure that the kafka version in Test class and the kafka >>> version in pom dependency are same. I tested your code successfully. >>> >>> Bests, >>> Godfrey >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-14807 >>> >>> >>> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道: >>> >>>> Hi kant, >>>> >>>> CSV format is an independent module, you need to add it as your >>>> dependency. >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-csv</artifactId> >>>> <version>${flink.version}</version> >>>> </dependency> >>>> >>>> >>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道: >>>> >>>>> ------------------------------------------------------------ >>>>> The program finished with the following exception: >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error: findAndCreateTableSource failed. >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>>> at >>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>>>> at >>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >>>>> Caused by: org.apache.flink.table.api.TableException: >>>>> findAndCreateTableSource failed. >>>>> at >>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >>>>> at >>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) >>>>> at >>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) >>>>> at >>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) >>>>> at >>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) >>>>> at >>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) >>>>> at >>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>>>> at >>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>>>> at >>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>>>> at >>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>>>> at >>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>>>> at >>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>>>> at >>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>>>> at >>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>>>> at >>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>>>> at >>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>>>> at >>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>>>> at >>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>>>> at >>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>>>> at >>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) >>>>> at Test.main(Test.java:34) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>>> ... 8 more >>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>>>> Could not find a suitable table factory for >>>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>>> the classpath. >>>>> >>>>> Reason: Required context properties mismatch. >>>>> >>>>> The matching candidates: >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>> Mismatched properties: >>>>> 'connector.type' expects 'filesystem', but is 'kafka' >>>>> >>>>> The following properties are requested: >>>>> connector.property-version=1 >>>>> connector.topic=test-topic1 >>>>> connector.type=kafka >>>>> connector.version=0.11 >>>>> format.property-version=1 >>>>> format.type=csv >>>>> schema.0.data-type=VARCHAR(2147483647) >>>>> schema.0.name=f0 >>>>> update-mode=append >>>>> >>>>> The following factories have been considered: >>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>> at >>>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >>>>> at >>>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >>>>> at >>>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>>>> at >>>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>>>> at >>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >>>>> ... 34 more >>>>> >>>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> > Thanks for the pointer. Looks like the documentation says to use >>>>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>>>> deprecated in Flink 1.10. >>>>>> >>>>>> It looks like not all of the documentation was updated after methods >>>>>> were deprecated. However if you look at the java docs of the >>>>>> `registerTableSink` method, you can find an answer [1]. >>>>>> >>>>>> > It sill doesnt work because it says for CSV the connector.type >>>>>> should be filesystem not Kafka. >>>>>> >>>>>> Can you post the full stack trace? As I’m not familiar with the Table >>>>>> API, maybe you Timo or Dawid know what’s going on here? >>>>>> >>>>>> Piotrek >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink- >>>>>> >>>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote: >>>>>> >>>>>> Here is my updated code after digging through the source code (not >>>>>> sure if it is correct ). It sill doesnt work because it says for CSV the >>>>>> connector.type should be filesystem not Kafka but documentation says it >>>>>> is >>>>>> supported. >>>>>> >>>>>> >>>>>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; >>>>>> import org.apache.flink.runtime.state.StateBackend; >>>>>> import >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>> import org.apache.flink.table.api.DataTypes; >>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>> import org.apache.flink.table.api.Table; >>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>>>> import org.apache.flink.table.descriptors.Csv; >>>>>> import org.apache.flink.table.descriptors.Kafka; >>>>>> import org.apache.flink.table.descriptors.Schema; >>>>>> >>>>>> public class Test { >>>>>> >>>>>> public static void main(String... args) throws Exception { >>>>>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>>>>> .useBlinkPlanner() >>>>>> .inStreamingMode() >>>>>> .build(); >>>>>> >>>>>> StreamExecutionEnvironment streamExecutionEnvironment = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> streamExecutionEnvironment.setStateBackend((StateBackend) new >>>>>> RocksDBStateBackend("file:///tmp/rocksdb")); >>>>>> StreamTableEnvironment tableEnvironment = >>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>>> >>>>>> tableEnvironment >>>>>> .connect( >>>>>> new Kafka() >>>>>> .version("0.11") >>>>>> .topic("test-topic1") >>>>>> ) >>>>>> .withFormat(new Csv()) >>>>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>>>> .inAppendMode() >>>>>> .createTemporaryTable("kafka_source"); >>>>>> >>>>>> Table resultTable = tableEnvironment.sqlQuery("select * from >>>>>> kafka_source"); >>>>>> >>>>>> tableEnvironment >>>>>> .connect( >>>>>> new Kafka() >>>>>> .version("0.11") >>>>>> .topic("test-topic2") >>>>>> ) >>>>>> .withFormat(new Csv()) >>>>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>>>> .inAppendMode() >>>>>> .createTemporaryTable("kafka_target"); >>>>>> >>>>>> tableEnvironment.insertInto("kafka_target", resultTable); >>>>>> >>>>>> tableEnvironment.execute("Sample Job"); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Benchao, >>>>>>> >>>>>>> Agreed a ConsoleSink is very useful but that is not the only problem >>>>>>> here. Documentation says use tableEnv.registerTableSink all over the >>>>>>> place >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink >>>>>>> however that function is deprecated. so how do I add any other Sink? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi kant, >>>>>>>> >>>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you >>>>>>>> can implement one custom sink following this doc[1]. >>>>>>>> >>>>>>>> IMO, an out-of-box print table sink is very useful, and I've >>>>>>>> created an issue[2] to track this. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink >>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354 >>>>>>>> >>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Thanks for the pointer. Looks like the documentation says to use >>>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a >>>>>>>>> sink that >>>>>>>>> can print to stdout? what sink should I use to print to stdout and >>>>>>>>> how do I >>>>>>>>> add it without converting into DataStream? >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski < >>>>>>>>> pi...@ververica.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked >>>>>>>>>> @Internal. It’s not part of any public API. >>>>>>>>>> >>>>>>>>>> You don’t have to convert DataStream into Table to read from >>>>>>>>>> Kafka in Table API. I guess you could, if you had used DataStream >>>>>>>>>> API’s >>>>>>>>>> FlinkKafkaConsumer as it’s documented here [1]. >>>>>>>>>> >>>>>>>>>> But you should be able to use Kafka Table connector directly, as >>>>>>>>>> it is described in the docs [2][3]. >>>>>>>>>> >>>>>>>>>> Piotrek >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >>>>>>>>>> [2] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >>>>>>>>>> [3] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>>>>>>> >>>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>> Also why do I need to convert to DataStream to print the rows of >>>>>>>>>> a table? Why not have a print method in the Table itself? >>>>>>>>>> >>>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi All, >>>>>>>>>>> >>>>>>>>>>> Do I need to use DataStream API or Table API to construct >>>>>>>>>>> sources? I am just trying to read from Kafka and print it to >>>>>>>>>>> console. And >>>>>>>>>>> yes I tried it with datastreams and it works fine but I want to do >>>>>>>>>>> it using >>>>>>>>>>> Table related APIs. I don't see any documentation or a sample on >>>>>>>>>>> how to >>>>>>>>>>> create Kafka table source or any other source using Table Source >>>>>>>>>>> API's so >>>>>>>>>>> after some digging I wrote the following code. My ultimate goal is >>>>>>>>>>> to avoid >>>>>>>>>>> Datastream API as much as possible and just use Table API & SQL but >>>>>>>>>>> somehow >>>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL >>>>>>>>>>> interface. am >>>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense to >>>>>>>>>>> focus on >>>>>>>>>>> SQL interfaces for both streaming and batch? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> import >>>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema; >>>>>>>>>>> import >>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>>>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; >>>>>>>>>>> import org.apache.flink.table.api.DataTypes; >>>>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>>>>>>> import org.apache.flink.table.api.Table; >>>>>>>>>>> import org.apache.flink.table.api.TableSchema; >>>>>>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>>>>>>>>> import org.apache.flink.table.sources.TableSource; >>>>>>>>>>> import org.apache.flink.types.Row; >>>>>>>>>>> >>>>>>>>>>> import java.io.IOException; >>>>>>>>>>> import java.util.Properties; >>>>>>>>>>> >>>>>>>>>>> public class Test { >>>>>>>>>>> >>>>>>>>>>> public class MyDeserializationSchema extends >>>>>>>>>>> AbstractDeserializationSchema<Row> { >>>>>>>>>>> @Override >>>>>>>>>>> public Row deserialize(byte[] message) throws IOException { >>>>>>>>>>> return Row.of(new String(message)); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> public static void main(String... args) throws Exception { >>>>>>>>>>> Test test = new Test(); >>>>>>>>>>> EnvironmentSettings settings = >>>>>>>>>>> EnvironmentSettings.newInstance() >>>>>>>>>>> .useBlinkPlanner() >>>>>>>>>>> .inStreamingMode() >>>>>>>>>>> .build(); >>>>>>>>>>> >>>>>>>>>>> StreamExecutionEnvironment streamExecutionEnvironment = >>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>>>>>>>> >>>>>>>>>>> TableSource tableSource = test.getKafkaTableSource(); >>>>>>>>>>> Table kafkaTable = >>>>>>>>>>> tableEnvironment.fromTableSource(tableSource); >>>>>>>>>>> tableEnvironment.createTemporaryView("kafka_source", >>>>>>>>>>> kafkaTable); >>>>>>>>>>> >>>>>>>>>>> Table resultTable = tableEnvironment.sqlQuery("select * >>>>>>>>>>> from kafka_source"); >>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, >>>>>>>>>>> Row.class).print(); >>>>>>>>>>> >>>>>>>>>>> streamExecutionEnvironment.execute("Sample Job"); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> public KafkaTableSource getKafkaTableSource() { >>>>>>>>>>> TableSchema tableSchema = TableSchema.builder().field("f0", >>>>>>>>>>> DataTypes.STRING()).build(); >>>>>>>>>>> Properties properties = new Properties(); >>>>>>>>>>> properties.setProperty("bootstrap.servers", >>>>>>>>>>> "localhost:9092"); >>>>>>>>>>> properties.setProperty("group.id", "test"); >>>>>>>>>>> return new KafkaTableSource(tableSchema, "test-topic1", >>>>>>>>>>> properties, new MyDeserializationSchema()); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I get the following error >>>>>>>>>>> >>>>>>>>>>> The program finished with the following exception: >>>>>>>>>>> >>>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not >>>>>>>>>>> serializable. The object probably contains or references non >>>>>>>>>>> serializable >>>>>>>>>>> fields. >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>>>>>>>> >>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>>>> >>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>>>>> >>>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>>>>> >>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>>>>>>>>> Test.main(Test.java:40) >>>>>>>>>>> >>>>>>>>>>> The error seems to be on the line >>>>>>>>>>> >>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>>>>>> >>>>>>>>>>> and I am not sure why it is not able to serialize? >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Benchao Li >>>>>>>> School of Electronics Engineering and Computer Science, Peking >>>>>>>> University >>>>>>>> Tel:+86-15650713730 >>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>>>>>> >>>>>>>> >>>>>> >>>> >>>> -- >>>> >>>> Benchao Li >>>> School of Electronics Engineering and Computer Science, Peking University >>>> Tel:+86-15650713730 >>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>> >>>> -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn