I don't know how gradle works, but in Maven, packaging dependencies into
one fat jar needs to specify how SPI property files should be dealt with,
like

<transformers>
    <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>

Could you check that your final jar contains correct resource file?

godfrey he <godfre...@gmail.com> 于2020年3月1日周日 下午5:25写道:

> I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead of
> `flink-connector-kafka_2.11`.
>
> Bests,
> Godfrey
>
> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道:
>
>> The dependency was already there. Below is my build.gradle. Also I
>> checked the kafka version and looks like the jar
>>
>> flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>>
>> downloads kafka-clients version 2.2.0. So I changed my code to version
>> 2.2.0 and same problem persists.
>>
>> buildscript {
>>     repositories {
>>         jcenter() // this applies only to the Gradle 'Shadow' plugin
>>     }
>>     dependencies {
>>         classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
>>     }
>> }
>>
>> plugins {
>>     id 'java'
>>     id 'application'
>> }
>>
>> mainClassName = 'Test'
>> apply plugin: 'com.github.johnrengelman.shadow'
>>
>> ext {
>>     javaVersion = '1.8'
>>     flinkVersion = '1.10.0'
>>     scalaBinaryVersion = '2.11'
>>     slf4jVersion = '1.7.7'
>>     log4jVersion = '1.2.17'
>> }
>>
>>
>> sourceCompatibility = javaVersion
>> targetCompatibility = javaVersion
>> tasks.withType(JavaCompile) {
>>     options.encoding = 'UTF-8'
>> }
>>
>> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
>>
>> // declare where to find the dependencies of your project
>> repositories {
>>     mavenCentral()
>>     maven { url 
>> "https://repository.apache.org/content/repositories/snapshots/"; }
>> }
>>
>> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then 
>> we could not run code
>> // in the IDE or with "gradle run". We also cannot exclude transitive 
>> dependencies from the
>> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
>> // -> Explicitly define the // libraries we want to be included in the 
>> "flinkShadowJar" configuration!
>> configurations {
>>     flinkShadowJar // dependencies which go into the shadowJar
>>
>>     // always exclude these (also from transitive dependencies) since they 
>> are provided by Flink
>>     flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
>>     flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 
>> 'jsr305'
>>     flinkShadowJar.exclude group: 'org.slf4j'
>>     flinkShadowJar.exclude group: 'log4j'
>> }
>>
>> // declare the dependencies for your production and test code
>> dependencies {
>>     // --------------------------------------------------------------
>>     // Compile-time dependencies that should NOT be part of the
>>     // shadow jar and are provided in the lib folder of Flink
>>     // --------------------------------------------------------------
>>     compile "org.apache.flink:flink-java:${flinkVersion}"
>>     compile 
>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>
>>     // --------------------------------------------------------------
>>     // Dependencies that should be part of the shadow jar, e.g.
>>     // connectors. These must be in the flinkShadowJar configuration!
>>     // --------------------------------------------------------------
>>
>>     compile "org.apache.flink:flink-java:${flinkVersion}"
>>     compile 
>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>     flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"
>>
>>     flinkShadowJar 
>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>>     flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
>>     compileOnly 
>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
>>     compileOnly 
>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
>>     flinkShadowJar 
>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
>>     flinkShadowJar 
>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"
>>
>>
>>     compile "log4j:log4j:${log4jVersion}"
>>     compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>>
>>     // Add test dependencies here.
>>     // testCompile "junit:junit:4.12"
>>     testImplementation 
>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
>>     testImplementation 
>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
>> }
>>
>> // make compileOnly dependencies available for tests:
>> sourceSets {
>>     main.compileClasspath += configurations.flinkShadowJar
>>     main.runtimeClasspath += configurations.flinkShadowJar
>>
>>     test.compileClasspath += configurations.flinkShadowJar
>>     test.runtimeClasspath += configurations.flinkShadowJar
>>
>>     javadoc.classpath += configurations.flinkShadowJar
>> }
>>
>> run.classpath = sourceSets.main.runtimeClasspath
>>
>> jar {
>>     manifest {
>>         attributes 'Built-By': System.getProperty('user.name'),
>>                 'Build-Jdk': System.getProperty('java.version')
>>     }
>> }
>>
>> shadowJar {
>>     configurations = [project.configurations.flinkShadowJar]
>> }
>>
>>
>>
>>
>> On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> wrote:
>>
>>> hi kant,
>>>
>>> > Also why do I need to convert to DataStream to print the rows of a
>>> table? Why not have a print method in the Table itself?
>>> Flink 1.10 introduces a utility class named TableUtils to convert a
>>> Table to List<Row>, this utility class is mainly used for demonstration or
>>> testing and is only applicable for *small batch jobs* and small finite 
>>> *append
>>> only stream jobs*.  code like:
>>> Table table = tEnv.sqlQuery("select ...");
>>> List<Row> result = TableUtils.collectToList(table);
>>> result.....
>>>
>>> currently, we are planner to implement Table#collect[1], after
>>> that Table#head and Table#print may be also introduced soon.
>>>
>>> >  The program finished with the following exception:
>>> please make sure that the kafka version in Test class and the kafka
>>> version in pom dependency are same. I tested your code successfully.
>>>
>>> Bests,
>>> Godfrey
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-14807
>>>
>>>
>>> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道:
>>>
>>>> Hi kant,
>>>>
>>>> CSV format is an independent module, you need to add it as your
>>>> dependency.
>>>>
>>>> <dependency>
>>>>    <groupId>org.apache.flink</groupId>
>>>>    <artifactId>flink-csv</artifactId>
>>>>    <version>${flink.version}</version>
>>>> </dependency>
>>>>
>>>>
>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道:
>>>>
>>>>> ------------------------------------------------------------
>>>>>  The program finished with the following exception:
>>>>>
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error: findAndCreateTableSource failed.
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>>> at
>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>>>> at
>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>> findAndCreateTableSource failed.
>>>>> at
>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>>>> at
>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>>>>> at
>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>>>>> at
>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>>>>> at
>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>>>>> at
>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>>>> at
>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>>>> at
>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>>>> at
>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>>>>> at Test.main(Test.java:34)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>>> ... 8 more
>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>>>> Could not find a suitable table factory for
>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>> the classpath.
>>>>>
>>>>> Reason: Required context properties mismatch.
>>>>>
>>>>> The matching candidates:
>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>> Mismatched properties:
>>>>> 'connector.type' expects 'filesystem', but is 'kafka'
>>>>>
>>>>> The following properties are requested:
>>>>> connector.property-version=1
>>>>> connector.topic=test-topic1
>>>>> connector.type=kafka
>>>>> connector.version=0.11
>>>>> format.property-version=1
>>>>> format.type=csv
>>>>> schema.0.data-type=VARCHAR(2147483647)
>>>>> schema.0.name=f0
>>>>> update-mode=append
>>>>>
>>>>> The following factories have been considered:
>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>> at
>>>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>>>>> at
>>>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>>>>> at
>>>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>>>> at
>>>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>>>> at
>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
>>>>> ... 34 more
>>>>>
>>>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> > Thanks for the pointer. Looks like the documentation says to use
>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>>>> deprecated in Flink 1.10.
>>>>>>
>>>>>> It looks like not all of the documentation was updated after methods
>>>>>> were deprecated. However if you look at the java docs of the
>>>>>> `registerTableSink` method, you can find an answer [1].
>>>>>>
>>>>>> >  It sill doesnt work because it says for CSV the connector.type
>>>>>> should be filesystem not Kafka.
>>>>>>
>>>>>> Can you post the full stack trace? As I’m not familiar with the Table
>>>>>> API, maybe you Timo or Dawid know what’s going on here?
>>>>>>
>>>>>> Piotrek
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
>>>>>>
>>>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote:
>>>>>>
>>>>>> Here is my updated code after digging through the source code (not
>>>>>> sure if it is correct ). It sill doesnt work because it says for CSV the
>>>>>> connector.type should be filesystem not Kafka but documentation says it 
>>>>>> is
>>>>>> supported.
>>>>>>
>>>>>>
>>>>>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
>>>>>> import org.apache.flink.runtime.state.StateBackend;
>>>>>> import 
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>> import org.apache.flink.table.api.DataTypes;
>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>> import org.apache.flink.table.api.Table;
>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>>>> import org.apache.flink.table.descriptors.Csv;
>>>>>> import org.apache.flink.table.descriptors.Kafka;
>>>>>> import org.apache.flink.table.descriptors.Schema;
>>>>>>
>>>>>> public class Test {
>>>>>>
>>>>>>     public static void main(String... args) throws Exception {
>>>>>>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>>>>                 .useBlinkPlanner()
>>>>>>                 .inStreamingMode()
>>>>>>                 .build();
>>>>>>
>>>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>         streamExecutionEnvironment.setStateBackend((StateBackend) new 
>>>>>> RocksDBStateBackend("file:///tmp/rocksdb"));
>>>>>>         StreamTableEnvironment tableEnvironment = 
>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>>>>
>>>>>>         tableEnvironment
>>>>>>             .connect(
>>>>>>                 new Kafka()
>>>>>>                     .version("0.11")
>>>>>>                     .topic("test-topic1")
>>>>>>             )
>>>>>>             .withFormat(new Csv())
>>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>>             .inAppendMode()
>>>>>>             .createTemporaryTable("kafka_source");
>>>>>>
>>>>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>>>>> kafka_source");
>>>>>>
>>>>>>         tableEnvironment
>>>>>>             .connect(
>>>>>>                 new Kafka()
>>>>>>                     .version("0.11")
>>>>>>                     .topic("test-topic2")
>>>>>>             )
>>>>>>             .withFormat(new Csv())
>>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>>             .inAppendMode()
>>>>>>             .createTemporaryTable("kafka_target");
>>>>>>
>>>>>>         tableEnvironment.insertInto("kafka_target", resultTable);
>>>>>>
>>>>>>         tableEnvironment.execute("Sample Job");
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Benchao,
>>>>>>>
>>>>>>> Agreed a ConsoleSink is very useful but that is not the only problem
>>>>>>> here. Documentation says use  tableEnv.registerTableSink all over the 
>>>>>>> place
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>>>>>>> however that function is deprecated. so how do I add any other Sink?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi kant,
>>>>>>>>
>>>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you
>>>>>>>> can implement one custom sink following this doc[1].
>>>>>>>>
>>>>>>>> IMO, an out-of-box print table sink is very useful, and I've
>>>>>>>> created an issue[2] to track this.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>>>>>>>
>>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Thanks for the pointer. Looks like the documentation says to use
>>>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a 
>>>>>>>>> sink that
>>>>>>>>> can print to stdout? what sink should I use to print to stdout and 
>>>>>>>>> how do I
>>>>>>>>> add it without converting into DataStream?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <
>>>>>>>>> pi...@ververica.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked
>>>>>>>>>> @Internal. It’s not part of any public API.
>>>>>>>>>>
>>>>>>>>>> You don’t have to convert DataStream into Table to read from
>>>>>>>>>> Kafka in Table API. I guess you could, if you had used DataStream 
>>>>>>>>>> API’s
>>>>>>>>>> FlinkKafkaConsumer as it’s documented here [1].
>>>>>>>>>>
>>>>>>>>>> But you should be able to use Kafka Table connector directly, as
>>>>>>>>>> it is described in the docs [2][3].
>>>>>>>>>>
>>>>>>>>>> Piotrek
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>>>>>>>> [2]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>>>>>>>>> [3]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>>>>>>
>>>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Also why do I need to convert to DataStream to print the rows of
>>>>>>>>>> a table? Why not have a print method in the Table itself?
>>>>>>>>>>
>>>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> Do I need to use DataStream API or Table API to construct
>>>>>>>>>>> sources? I am just trying to read from Kafka and print it to 
>>>>>>>>>>> console. And
>>>>>>>>>>> yes I tried it with datastreams and it works fine but I want to do 
>>>>>>>>>>> it using
>>>>>>>>>>> Table related APIs. I don't see any documentation or a sample on 
>>>>>>>>>>> how to
>>>>>>>>>>> create Kafka table source or any other source using Table Source 
>>>>>>>>>>> API's so
>>>>>>>>>>> after some digging I wrote the following code. My ultimate goal is 
>>>>>>>>>>> to avoid
>>>>>>>>>>> Datastream API as much as possible and just use Table API & SQL but 
>>>>>>>>>>> somehow
>>>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL 
>>>>>>>>>>> interface. am
>>>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense to 
>>>>>>>>>>> focus on
>>>>>>>>>>> SQL interfaces for both streaming and batch?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> import 
>>>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>>>>>>>>>>> import 
>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>>>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>>>>>>>>>>> import org.apache.flink.table.api.DataTypes;
>>>>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>>>>>>> import org.apache.flink.table.api.Table;
>>>>>>>>>>> import org.apache.flink.table.api.TableSchema;
>>>>>>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>>>>>>>>> import org.apache.flink.table.sources.TableSource;
>>>>>>>>>>> import org.apache.flink.types.Row;
>>>>>>>>>>>
>>>>>>>>>>> import java.io.IOException;
>>>>>>>>>>> import java.util.Properties;
>>>>>>>>>>>
>>>>>>>>>>> public class Test {
>>>>>>>>>>>
>>>>>>>>>>>     public class MyDeserializationSchema extends 
>>>>>>>>>>> AbstractDeserializationSchema<Row> {
>>>>>>>>>>>         @Override
>>>>>>>>>>>         public Row deserialize(byte[] message) throws IOException {
>>>>>>>>>>>             return Row.of(new String(message));
>>>>>>>>>>>         }
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>>>>         Test test = new Test();
>>>>>>>>>>>         EnvironmentSettings settings = 
>>>>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>>>>                 .inStreamingMode()
>>>>>>>>>>>                 .build();
>>>>>>>>>>>
>>>>>>>>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>         StreamTableEnvironment tableEnvironment = 
>>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>>>>>>>>>
>>>>>>>>>>>         TableSource tableSource = test.getKafkaTableSource();
>>>>>>>>>>>         Table kafkaTable = 
>>>>>>>>>>> tableEnvironment.fromTableSource(tableSource);
>>>>>>>>>>>         tableEnvironment.createTemporaryView("kafka_source", 
>>>>>>>>>>> kafkaTable);
>>>>>>>>>>>
>>>>>>>>>>>         Table resultTable = tableEnvironment.sqlQuery("select * 
>>>>>>>>>>> from kafka_source");
>>>>>>>>>>>         tableEnvironment.toAppendStream(resultTable, 
>>>>>>>>>>> Row.class).print();
>>>>>>>>>>>
>>>>>>>>>>>         streamExecutionEnvironment.execute("Sample Job");
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     public KafkaTableSource getKafkaTableSource() {
>>>>>>>>>>>         TableSchema tableSchema = TableSchema.builder().field("f0", 
>>>>>>>>>>> DataTypes.STRING()).build();
>>>>>>>>>>>         Properties properties = new Properties();
>>>>>>>>>>>         properties.setProperty("bootstrap.servers", 
>>>>>>>>>>> "localhost:9092");
>>>>>>>>>>>         properties.setProperty("group.id", "test");
>>>>>>>>>>>         return new KafkaTableSource(tableSchema, "test-topic1", 
>>>>>>>>>>> properties, new MyDeserializationSchema());
>>>>>>>>>>>     }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I get the following error
>>>>>>>>>>>
>>>>>>>>>>> The program finished with the following exception:
>>>>>>>>>>>
>>>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not
>>>>>>>>>>> serializable. The object probably contains or references non 
>>>>>>>>>>> serializable
>>>>>>>>>>> fields.
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>>>>>>>>>
>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>>
>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>>>>>
>>>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>>>>
>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>>>>>>>>> Test.main(Test.java:40)
>>>>>>>>>>>
>>>>>>>>>>> The error seems to be on the line
>>>>>>>>>>>
>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>>>>>>>>
>>>>>>>>>>> and I am not sure why it is not able to serialize?
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Benchao Li
>>>>>>>> School of Electronics Engineering and Computer Science, Peking 
>>>>>>>> University
>>>>>>>> Tel:+86-15650713730
>>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> Benchao Li
>>>> School of Electronics Engineering and Computer Science, Peking University
>>>> Tel:+86-15650713730
>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>
>>>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Reply via email to