The dependency was already there. Below is my build.gradle. Also I checked the kafka version and looks like the jar
flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" downloads kafka-clients version 2.2.0. So I changed my code to version 2.2.0 and same problem persists. buildscript { repositories { jcenter() // this applies only to the Gradle 'Shadow' plugin } dependencies { classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4' } } plugins { id 'java' id 'application' } mainClassName = 'Test' apply plugin: 'com.github.johnrengelman.shadow' ext { javaVersion = '1.8' flinkVersion = '1.10.0' scalaBinaryVersion = '2.11' slf4jVersion = '1.7.7' log4jVersion = '1.2.17' } sourceCompatibility = javaVersion targetCompatibility = javaVersion tasks.withType(JavaCompile) { options.encoding = 'UTF-8' } applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"] // declare where to find the dependencies of your project repositories { mavenCentral() maven { url "https://repository.apache.org/content/repositories/snapshots/" } } // NOTE: We cannot use "compileOnly" or "shadow" configurations since then we could not run code // in the IDE or with "gradle run". We also cannot exclude transitive dependencies from the // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159). // -> Explicitly define the // libraries we want to be included in the "flinkShadowJar" configuration! configurations { flinkShadowJar // dependencies which go into the shadowJar // always exclude these (also from transitive dependencies) since they are provided by Flink flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading' flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305' flinkShadowJar.exclude group: 'org.slf4j' flinkShadowJar.exclude group: 'log4j' } // declare the dependencies for your production and test code dependencies { // -------------------------------------------------------------- // Compile-time dependencies that should NOT be part of the // shadow jar and are provided in the lib folder of Flink // -------------------------------------------------------------- compile "org.apache.flink:flink-java:${flinkVersion}" compile "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" // -------------------------------------------------------------- // Dependencies that should be part of the shadow jar, e.g. // connectors. These must be in the flinkShadowJar configuration! // -------------------------------------------------------------- compile "org.apache.flink:flink-java:${flinkVersion}" compile "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}" flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" compileOnly "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" compileOnly "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" flinkShadowJar "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" flinkShadowJar "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" compile "log4j:log4j:${log4jVersion}" compile "org.slf4j:slf4j-log4j12:${slf4jVersion}" // Add test dependencies here. // testCompile "junit:junit:4.12" testImplementation "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" testImplementation "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" } // make compileOnly dependencies available for tests: sourceSets { main.compileClasspath += configurations.flinkShadowJar main.runtimeClasspath += configurations.flinkShadowJar test.compileClasspath += configurations.flinkShadowJar test.runtimeClasspath += configurations.flinkShadowJar javadoc.classpath += configurations.flinkShadowJar } run.classpath = sourceSets.main.runtimeClasspath jar { manifest { attributes 'Built-By': System.getProperty('user.name'), 'Build-Jdk': System.getProperty('java.version') } } shadowJar { configurations = [project.configurations.flinkShadowJar] } On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> wrote: > hi kant, > > > Also why do I need to convert to DataStream to print the rows of a > table? Why not have a print method in the Table itself? > Flink 1.10 introduces a utility class named TableUtils to convert a Table > to List<Row>, this utility class is mainly used for demonstration or > testing and is only applicable for *small batch jobs* and small finite *append > only stream jobs*. code like: > Table table = tEnv.sqlQuery("select ..."); > List<Row> result = TableUtils.collectToList(table); > result..... > > currently, we are planner to implement Table#collect[1], after > that Table#head and Table#print may be also introduced soon. > > > The program finished with the following exception: > please make sure that the kafka version in Test class and the kafka > version in pom dependency are same. I tested your code successfully. > > Bests, > Godfrey > > [1] https://issues.apache.org/jira/browse/FLINK-14807 > > > Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道: > >> Hi kant, >> >> CSV format is an independent module, you need to add it as your >> dependency. >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-csv</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道: >> >>> ------------------------------------------------------------ >>> The program finished with the following exception: >>> >>> org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error: findAndCreateTableSource failed. >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>> at >>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>> at >>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >>> Caused by: org.apache.flink.table.api.TableException: >>> findAndCreateTableSource failed. >>> at >>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >>> at >>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) >>> at >>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) >>> at >>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) >>> at >>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) >>> at >>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) >>> at Test.main(Test.java:34) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>> ... 8 more >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>> Could not find a suitable table factory for >>> 'org.apache.flink.table.factories.TableSourceFactory' in >>> the classpath. >>> >>> Reason: Required context properties mismatch. >>> >>> The matching candidates: >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>> Mismatched properties: >>> 'connector.type' expects 'filesystem', but is 'kafka' >>> >>> The following properties are requested: >>> connector.property-version=1 >>> connector.topic=test-topic1 >>> connector.type=kafka >>> connector.version=0.11 >>> format.property-version=1 >>> format.type=csv >>> schema.0.data-type=VARCHAR(2147483647) >>> schema.0.name=f0 >>> update-mode=append >>> >>> The following factories have been considered: >>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>> at >>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >>> at >>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >>> at >>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>> at >>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>> at >>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >>> ... 34 more >>> >>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com> >>> wrote: >>> >>>> Hi, >>>> >>>> > Thanks for the pointer. Looks like the documentation says to use >>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>> deprecated in Flink 1.10. >>>> >>>> It looks like not all of the documentation was updated after methods >>>> were deprecated. However if you look at the java docs of the >>>> `registerTableSink` method, you can find an answer [1]. >>>> >>>> > It sill doesnt work because it says for CSV the connector.type >>>> should be filesystem not Kafka. >>>> >>>> Can you post the full stack trace? As I’m not familiar with the Table >>>> API, maybe you Timo or Dawid know what’s going on here? >>>> >>>> Piotrek >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink- >>>> >>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote: >>>> >>>> Here is my updated code after digging through the source code (not sure >>>> if it is correct ). It sill doesnt work because it says for CSV the >>>> connector.type should be filesystem not Kafka but documentation says it is >>>> supported. >>>> >>>> >>>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; >>>> import org.apache.flink.runtime.state.StateBackend; >>>> import >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> import org.apache.flink.table.api.DataTypes; >>>> import org.apache.flink.table.api.EnvironmentSettings; >>>> import org.apache.flink.table.api.Table; >>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>> import org.apache.flink.table.descriptors.Csv; >>>> import org.apache.flink.table.descriptors.Kafka; >>>> import org.apache.flink.table.descriptors.Schema; >>>> >>>> public class Test { >>>> >>>> public static void main(String... args) throws Exception { >>>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>>> .useBlinkPlanner() >>>> .inStreamingMode() >>>> .build(); >>>> >>>> StreamExecutionEnvironment streamExecutionEnvironment = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> streamExecutionEnvironment.setStateBackend((StateBackend) new >>>> RocksDBStateBackend("file:///tmp/rocksdb")); >>>> StreamTableEnvironment tableEnvironment = >>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>> >>>> tableEnvironment >>>> .connect( >>>> new Kafka() >>>> .version("0.11") >>>> .topic("test-topic1") >>>> ) >>>> .withFormat(new Csv()) >>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>> .inAppendMode() >>>> .createTemporaryTable("kafka_source"); >>>> >>>> Table resultTable = tableEnvironment.sqlQuery("select * from >>>> kafka_source"); >>>> >>>> tableEnvironment >>>> .connect( >>>> new Kafka() >>>> .version("0.11") >>>> .topic("test-topic2") >>>> ) >>>> .withFormat(new Csv()) >>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>> .inAppendMode() >>>> .createTemporaryTable("kafka_target"); >>>> >>>> tableEnvironment.insertInto("kafka_target", resultTable); >>>> >>>> tableEnvironment.execute("Sample Job"); >>>> } >>>> } >>>> >>>> >>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> wrote: >>>> >>>>> Hi Benchao, >>>>> >>>>> Agreed a ConsoleSink is very useful but that is not the only problem >>>>> here. Documentation says use tableEnv.registerTableSink all over the >>>>> place >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink >>>>> however that function is deprecated. so how do I add any other Sink? >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi kant, >>>>>> >>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you can >>>>>> implement one custom sink following this doc[1]. >>>>>> >>>>>> IMO, an out-of-box print table sink is very useful, and I've created >>>>>> an issue[2] to track this. >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink >>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354 >>>>>> >>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Thanks for the pointer. Looks like the documentation says to use >>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink >>>>>>> that >>>>>>> can print to stdout? what sink should I use to print to stdout and how >>>>>>> do I >>>>>>> add it without converting into DataStream? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. >>>>>>>> It’s not part of any public API. >>>>>>>> >>>>>>>> You don’t have to convert DataStream into Table to read from Kafka >>>>>>>> in Table API. I guess you could, if you had used DataStream API’s >>>>>>>> FlinkKafkaConsumer as it’s documented here [1]. >>>>>>>> >>>>>>>> But you should be able to use Kafka Table connector directly, as it >>>>>>>> is described in the docs [2][3]. >>>>>>>> >>>>>>>> Piotrek >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >>>>>>>> [2] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >>>>>>>> [3] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>>>>> >>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote: >>>>>>>> >>>>>>>> Also why do I need to convert to DataStream to print the rows of a >>>>>>>> table? Why not have a print method in the Table itself? >>>>>>>> >>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> Do I need to use DataStream API or Table API to construct sources? >>>>>>>>> I am just trying to read from Kafka and print it to console. And yes I >>>>>>>>> tried it with datastreams and it works fine but I want to do it using >>>>>>>>> Table >>>>>>>>> related APIs. I don't see any documentation or a sample on how to >>>>>>>>> create >>>>>>>>> Kafka table source or any other source using Table Source API's so >>>>>>>>> after >>>>>>>>> some digging I wrote the following code. My ultimate goal is to avoid >>>>>>>>> Datastream API as much as possible and just use Table API & SQL but >>>>>>>>> somehow >>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL >>>>>>>>> interface. am >>>>>>>>> I wrong? From the user perspective wouldn't it make more sense to >>>>>>>>> focus on >>>>>>>>> SQL interfaces for both streaming and batch? >>>>>>>>> >>>>>>>>> >>>>>>>>> import >>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema; >>>>>>>>> import >>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; >>>>>>>>> import org.apache.flink.table.api.DataTypes; >>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>>>>> import org.apache.flink.table.api.Table; >>>>>>>>> import org.apache.flink.table.api.TableSchema; >>>>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>>>>>>> import org.apache.flink.table.sources.TableSource; >>>>>>>>> import org.apache.flink.types.Row; >>>>>>>>> >>>>>>>>> import java.io.IOException; >>>>>>>>> import java.util.Properties; >>>>>>>>> >>>>>>>>> public class Test { >>>>>>>>> >>>>>>>>> public class MyDeserializationSchema extends >>>>>>>>> AbstractDeserializationSchema<Row> { >>>>>>>>> @Override >>>>>>>>> public Row deserialize(byte[] message) throws IOException { >>>>>>>>> return Row.of(new String(message)); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> public static void main(String... args) throws Exception { >>>>>>>>> Test test = new Test(); >>>>>>>>> EnvironmentSettings settings = >>>>>>>>> EnvironmentSettings.newInstance() >>>>>>>>> .useBlinkPlanner() >>>>>>>>> .inStreamingMode() >>>>>>>>> .build(); >>>>>>>>> >>>>>>>>> StreamExecutionEnvironment streamExecutionEnvironment = >>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>>>>>> >>>>>>>>> TableSource tableSource = test.getKafkaTableSource(); >>>>>>>>> Table kafkaTable = >>>>>>>>> tableEnvironment.fromTableSource(tableSource); >>>>>>>>> tableEnvironment.createTemporaryView("kafka_source", >>>>>>>>> kafkaTable); >>>>>>>>> >>>>>>>>> Table resultTable = tableEnvironment.sqlQuery("select * from >>>>>>>>> kafka_source"); >>>>>>>>> tableEnvironment.toAppendStream(resultTable, >>>>>>>>> Row.class).print(); >>>>>>>>> >>>>>>>>> streamExecutionEnvironment.execute("Sample Job"); >>>>>>>>> } >>>>>>>>> >>>>>>>>> public KafkaTableSource getKafkaTableSource() { >>>>>>>>> TableSchema tableSchema = TableSchema.builder().field("f0", >>>>>>>>> DataTypes.STRING()).build(); >>>>>>>>> Properties properties = new Properties(); >>>>>>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>>>>>> properties.setProperty("group.id", "test"); >>>>>>>>> return new KafkaTableSource(tableSchema, "test-topic1", >>>>>>>>> properties, new MyDeserializationSchema()); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> I get the following error >>>>>>>>> >>>>>>>>> The program finished with the following exception: >>>>>>>>> >>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not >>>>>>>>> serializable. The object probably contains or references non >>>>>>>>> serializable >>>>>>>>> fields. >>>>>>>>> >>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>>>> >>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>>>> >>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>>>>>>>> >>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>>>>>>>> >>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>>>>>>>> >>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>>>>>>>> >>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>>>>>> >>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>> >>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>>> >>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>>>>>> >>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>>>>>> >>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>>>>>> >>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>>>>>>> >>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>>>>>>> Test.main(Test.java:40) >>>>>>>>> >>>>>>>>> The error seems to be on the line >>>>>>>>> >>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>>>> >>>>>>>>> and I am not sure why it is not able to serialize? >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Benchao Li >>>>>> School of Electronics Engineering and Computer Science, Peking University >>>>>> Tel:+86-15650713730 >>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>>>> >>>>>> >>>> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn >> >>