* What went wrong:
Could not determine the dependencies of task ':shadowJar'.
> Could not resolve all dependencies for configuration ':flinkShadowJar'.
   > Could not find
org.apache.flink:flink-sql-connector-kafka_2.11:universal.
     Searched in the following locations:
       -
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
       -
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
       -
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
       -
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
     Required by:
         project :



On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Kant,
>
> If you want to use the *universal *kafka connector you use "universal"
> for the version. The community decided to no longer distinguish different
> kafka connector versions, but to use the newest kafka client version for
> all versions of kafka 1.0+. So if you want to use the connector from
> flink-sql-connector-kafka_2.11 use "universal" for the version.
>
> As for the collect/print sink. We do realize importance of the sink and
> there were a few approaches to implement one. Including the TableUtils
> mentioned by godfrey. It does not have strong consistency guarantees and is
> recommended rather only for experiments/testing. There is also an ongoing
> discussion how to implement such a sink for *both *batch and streaming
> here:
> https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17046455#comment-17046455
>
> Best,
>
> Dawid
> On 01/03/2020 12:00, kant kodali wrote:
>
> Hi Benchao,
>
> That worked! Pasting the build.gradle file here. However this only works
> for 0.11 and it needs zookeeper.connect() which shouldn't be required. not
> sure why it is required in Flink Kafka connector?  If I change the version
> to 2.2 in the code and specify this jar
>
> flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>
> or
>
> flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" //Not 
> sure if I should use this one for Kafka >= 0.11
>
> It doesn't work either.
>
>
> buildscript {    repositories {        jcenter() // this applies only to the 
> Gradle 'Shadow' plugin    }    dependencies {        classpath 
> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'    }}plugins {    id 
> 'java'    id 'application'}mainClassName = 'Test'apply plugin: 
> 'com.github.johnrengelman.shadow'ext {    javaVersion = '1.8'    flinkVersion 
> = '1.10.0'    scalaBinaryVersion = '2.11'    slf4jVersion = '1.7.7'    
> log4jVersion = '1.2.17'}sourceCompatibility = javaVersiontargetCompatibility 
> = javaVersiontasks.withType(JavaCompile) {    options.encoding = 
> 'UTF-8'}applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
> // declare where to find the dependencies of your projectrepositories {    
> mavenCentral()
>     maven { url 
> "https://repository.apache.org/content/repositories/snapshots/"; }}// NOTE: We 
> cannot use "compileOnly" or "shadow" configurations since then we could not 
> run code// in the IDE or with "gradle run". We also cannot exclude transitive 
> dependencies from the// shadowJar yet (see 
> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly define 
> the // libraries we want to be included in the "flinkShadowJar" 
> configuration!configurations {    flinkShadowJar // dependencies which go 
> into the shadowJar    // always exclude these (also from transitive 
> dependencies) since they are provided by Flink    flinkShadowJar.exclude 
> group: 'org.apache.flink', module: 'force-shading'    flinkShadowJar.exclude 
> group: 'com.google.code.findbugs', module: 'jsr305'    flinkShadowJar.exclude 
> group: 'org.slf4j'    flinkShadowJar.exclude group: 'log4j'}// declare the 
> dependencies for your production and test codedependencies {    // 
> --------------------------------------------------------------    // 
> Compile-time dependencies that should NOT be part of the    // shadow jar and 
> are provided in the lib folder of Flink    // 
> --------------------------------------------------------------    compile 
> "org.apache.flink:flink-java:${flinkVersion}"    compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" 
>    // --------------------------------------------------------------    // 
> Dependencies that should be part of the shadow jar, e.g.    // connectors. 
> These must be in the flinkShadowJar configuration!    // 
> --------------------------------------------------------------    compile 
> "org.apache.flink:flink-java:${flinkVersion}"    compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" 
>    flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"    // tried 
> doesnt work. same problem    //flinkShadowJar 
> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"    // tried 
> doesnt work. same problem    //flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"    //tried 
> doesnt work. same problem    flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}"    
> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"    
> compileOnly 
> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
> compileOnly "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" 
>    flinkShadowJar 
> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"    
> flinkShadowJar 
> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"    compile 
> "log4j:log4j:${log4jVersion}"    compile 
> "org.slf4j:slf4j-log4j12:${slf4jVersion}"    // Add test dependencies here.   
>  // testCompile "junit:junit:4.12"    testImplementation 
> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
> testImplementation 
> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// make 
> compileOnly dependencies available for tests:sourceSets {    
> main.compileClasspath += configurations.flinkShadowJar
>     main.runtimeClasspath += configurations.flinkShadowJar
>
>     test.compileClasspath += configurations.flinkShadowJar
>     test.runtimeClasspath += configurations.flinkShadowJar
>
>     javadoc.classpath += configurations.flinkShadowJar}run.classpath = 
> sourceSets.main.runtimeClasspathjar {    manifest {        attributes 
> 'Built-By': System.getProperty('user.name'),
>                 'Build-Jdk': System.getProperty('java.version')
>     }}shadowJar {    configurations = [project.configurations.flinkShadowJar]
>     mergeServiceFiles()
>     manifest {        attributes 'Main-Class': mainClassName    }}
>
>
>
>
>
>
> On Sun, Mar 1, 2020 at 1:38 AM Benchao Li <libenc...@gmail.com> wrote:
>
>> I don't know how gradle works, but in Maven, packaging dependencies into
>> one fat jar needs to specify how SPI property files should be dealt with,
>> like
>>
>> <transformers>    <transformer 
>> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers>
>>
>> Could you check that your final jar contains correct resource file?
>>
>> godfrey he <godfre...@gmail.com> 于2020年3月1日周日 下午5:25写道:
>>
>>> I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead
>>> of `flink-connector-kafka_2.11`.
>>>
>>> Bests,
>>> Godfrey
>>>
>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道:
>>>
>>>> The dependency was already there. Below is my build.gradle. Also I
>>>> checked the kafka version and looks like the jar
>>>>
>>>> flinkShadowJar 
>>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>>>>
>>>> downloads kafka-clients version 2.2.0. So I changed my code to version
>>>> 2.2.0 and same problem persists.
>>>>
>>>> buildscript {    repositories {        jcenter() // this applies only to 
>>>> the Gradle 'Shadow' plugin    }    dependencies {        classpath 
>>>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'    }}plugins {    id 
>>>> 'java'    id 'application'}mainClassName = 'Test'apply plugin: 
>>>> 'com.github.johnrengelman.shadow'ext {    javaVersion = '1.8'    
>>>> flinkVersion = '1.10.0'    scalaBinaryVersion = '2.11'    slf4jVersion = 
>>>> '1.7.7'    log4jVersion = '1.2.17'}sourceCompatibility = 
>>>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) {  
>>>>   options.encoding = 'UTF-8'}applicationDefaultJvmArgs = 
>>>> ["-Dlog4j.configuration=log4j.properties"]
>>>> // declare where to find the dependencies of your projectrepositories {    
>>>> mavenCentral()
>>>>     maven { url 
>>>> "https://repository.apache.org/content/repositories/snapshots/"; }}// NOTE: 
>>>> We cannot use "compileOnly" or "shadow" configurations since then we could 
>>>> not run code// in the IDE or with "gradle run". We also cannot exclude 
>>>> transitive dependencies from the// shadowJar yet (see 
>>>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly 
>>>> define the // libraries we want to be included in the "flinkShadowJar" 
>>>> configuration!configurations {    flinkShadowJar // dependencies which go 
>>>> into the shadowJar    // always exclude these (also from transitive 
>>>> dependencies) since they are provided by Flink    flinkShadowJar.exclude 
>>>> group: 'org.apache.flink', module: 'force-shading'    
>>>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305' 
>>>>    flinkShadowJar.exclude group: 'org.slf4j'    flinkShadowJar.exclude 
>>>> group: 'log4j'}// declare the dependencies for your production and test 
>>>> codedependencies {    // 
>>>> --------------------------------------------------------------    // 
>>>> Compile-time dependencies that should NOT be part of the    // shadow jar 
>>>> and are provided in the lib folder of Flink    // 
>>>> --------------------------------------------------------------    compile 
>>>> "org.apache.flink:flink-java:${flinkVersion}"    compile 
>>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>>     // --------------------------------------------------------------    
>>>> // Dependencies that should be part of the shadow jar, e.g.    // 
>>>> connectors. These must be in the flinkShadowJar configuration!    // 
>>>> --------------------------------------------------------------    compile 
>>>> "org.apache.flink:flink-java:${flinkVersion}"    compile 
>>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>>     flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"    
>>>> flinkShadowJar 
>>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"    
>>>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"    
>>>> compileOnly 
>>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
>>>> compileOnly 
>>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"    
>>>> flinkShadowJar 
>>>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"    
>>>> flinkShadowJar 
>>>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"    
>>>> compile "log4j:log4j:${log4jVersion}"    compile 
>>>> "org.slf4j:slf4j-log4j12:${slf4jVersion}"    // Add test dependencies 
>>>> here.    // testCompile "junit:junit:4.12"    testImplementation 
>>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
>>>> testImplementation 
>>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// make 
>>>> compileOnly dependencies available for tests:sourceSets {    
>>>> main.compileClasspath += configurations.flinkShadowJar
>>>>     main.runtimeClasspath += configurations.flinkShadowJar
>>>>
>>>>     test.compileClasspath += configurations.flinkShadowJar
>>>>     test.runtimeClasspath += configurations.flinkShadowJar
>>>>
>>>>     javadoc.classpath += configurations.flinkShadowJar}run.classpath = 
>>>> sourceSets.main.runtimeClasspathjar {    manifest {        attributes 
>>>> 'Built-By': System.getProperty('user.name'),
>>>>                 'Build-Jdk': System.getProperty('java.version')
>>>>     }}shadowJar {    configurations = 
>>>> [project.configurations.flinkShadowJar]}
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> wrote:
>>>>
>>>>> hi kant,
>>>>>
>>>>> > Also why do I need to convert to DataStream to print the rows of a
>>>>> table? Why not have a print method in the Table itself?
>>>>> Flink 1.10 introduces a utility class named TableUtils to convert a
>>>>> Table to List<Row>, this utility class is mainly used for demonstration or
>>>>> testing and is only applicable for *small batch jobs* and small
>>>>> finite *append only stream jobs*.  code like:
>>>>> Table table = tEnv.sqlQuery("select ...");
>>>>> List<Row> result = TableUtils.collectToList(table);
>>>>> result.....
>>>>>
>>>>> currently, we are planner to implement Table#collect[1], after
>>>>> that Table#head and Table#print may be also introduced soon.
>>>>>
>>>>> >  The program finished with the following exception:
>>>>> please make sure that the kafka version in Test class and the kafka
>>>>> version in pom dependency are same. I tested your code successfully.
>>>>>
>>>>> Bests,
>>>>> Godfrey
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-14807
>>>>>
>>>>>
>>>>> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道:
>>>>>
>>>>>> Hi kant,
>>>>>>
>>>>>> CSV format is an independent module, you need to add it as your
>>>>>> dependency.
>>>>>>
>>>>>> <dependency>   <groupId>org.apache.flink</groupId>   
>>>>>> <artifactId>flink-csv</artifactId>   
>>>>>> <version>${flink.version}</version></dependency>
>>>>>>
>>>>>>
>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道:
>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>>  The program finished with the following exception:
>>>>>>>
>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>> method caused an error: findAndCreateTableSource failed.
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>>>>> at
>>>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>>>>> at
>>>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>>>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>>>>> at
>>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>>>>>> at
>>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>> findAndCreateTableSource failed.
>>>>>>> at
>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>>>>>> at
>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>>>>>>> at
>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>>>>>> at
>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>>>>>> at
>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>>>>>> at
>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>>>>>> at
>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>>>>>> at
>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>>>>>> at
>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>>>>>> at
>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>>>>>> at
>>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>>>>>>> at Test.main(Test.java:34)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>>>>> ... 8 more
>>>>>>> Caused by:
>>>>>>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not 
>>>>>>> find
>>>>>>> a suitable table factory for
>>>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>>>> the classpath.
>>>>>>>
>>>>>>> Reason: Required context properties mismatch.
>>>>>>>
>>>>>>> The matching candidates:
>>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>>>> Mismatched properties:
>>>>>>> 'connector.type' expects 'filesystem', but is 'kafka'
>>>>>>>
>>>>>>> The following properties are requested:
>>>>>>> connector.property-version=1
>>>>>>> connector.topic=test-topic1
>>>>>>> connector.type=kafka
>>>>>>> connector.version=0.11
>>>>>>> format.property-version=1
>>>>>>> format.type=csv
>>>>>>> schema.0.data-type=VARCHAR(2147483647)
>>>>>>> schema.0.name=f0
>>>>>>> update-mode=append
>>>>>>>
>>>>>>> The following factories have been considered:
>>>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>>>> at
>>>>>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>>>>>>> at
>>>>>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>>>>>>> at
>>>>>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>>>>>> at
>>>>>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>>>>>> at
>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
>>>>>>> ... 34 more
>>>>>>>
>>>>>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> > Thanks for the pointer. Looks like the documentation says to use
>>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>>>>>> deprecated in Flink 1.10.
>>>>>>>>
>>>>>>>> It looks like not all of the documentation was updated after
>>>>>>>> methods were deprecated. However if you look at the java docs of the
>>>>>>>> `registerTableSink` method, you can find an answer [1].
>>>>>>>>
>>>>>>>> >  It sill doesnt work because it says for CSV the connector.type
>>>>>>>> should be filesystem not Kafka.
>>>>>>>>
>>>>>>>> Can you post the full stack trace? As I’m not familiar with the
>>>>>>>> Table API, maybe you Timo or Dawid know what’s going on here?
>>>>>>>>
>>>>>>>> Piotrek
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
>>>>>>>>
>>>>>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Here is my updated code after digging through the source code (not
>>>>>>>> sure if it is correct ). It sill doesnt work because it says for CSV 
>>>>>>>> the
>>>>>>>> connector.type should be filesystem not Kafka but documentation says 
>>>>>>>> it is
>>>>>>>> supported.
>>>>>>>>
>>>>>>>>
>>>>>>>> import 
>>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import 
>>>>>>>> org.apache.flink.runtime.state.StateBackend;import 
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
>>>>>>>>  org.apache.flink.table.api.DataTypes;import 
>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import 
>>>>>>>> org.apache.flink.table.api.Table;import 
>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import 
>>>>>>>> org.apache.flink.table.descriptors.Csv;import 
>>>>>>>> org.apache.flink.table.descriptors.Kafka;import 
>>>>>>>> org.apache.flink.table.descriptors.Schema;public class Test {
>>>>>>>>
>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>         EnvironmentSettings settings = 
>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>                 .inStreamingMode()
>>>>>>>>                 .build();        StreamExecutionEnvironment 
>>>>>>>> streamExecutionEnvironment = 
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();        
>>>>>>>> streamExecutionEnvironment.setStateBackend((StateBackend) new 
>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb"));        
>>>>>>>> StreamTableEnvironment tableEnvironment = 
>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);   
>>>>>>>>      tableEnvironment
>>>>>>>>             .connect(
>>>>>>>>                 new Kafka()
>>>>>>>>                     .version("0.11")
>>>>>>>>                     .topic("test-topic1")
>>>>>>>>             )
>>>>>>>>             .withFormat(new Csv())
>>>>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>>>>             .inAppendMode()
>>>>>>>>             .createTemporaryTable("kafka_source");        Table 
>>>>>>>> resultTable = tableEnvironment.sqlQuery("select * from kafka_source"); 
>>>>>>>>        tableEnvironment
>>>>>>>>             .connect(
>>>>>>>>                 new Kafka()
>>>>>>>>                     .version("0.11")
>>>>>>>>                     .topic("test-topic2")
>>>>>>>>             )
>>>>>>>>             .withFormat(new Csv())
>>>>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>>>>             .inAppendMode()
>>>>>>>>             .createTemporaryTable("kafka_target");        
>>>>>>>> tableEnvironment.insertInto("kafka_target", resultTable);        
>>>>>>>> tableEnvironment.execute("Sample Job");    }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Benchao,
>>>>>>>>>
>>>>>>>>> Agreed a ConsoleSink is very useful but that is not the only
>>>>>>>>> problem here. Documentation says use  tableEnv.registerTableSink all 
>>>>>>>>> over
>>>>>>>>> the place
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>>>>>>>>> however that function is deprecated. so how do I add any other Sink?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi kant,
>>>>>>>>>>
>>>>>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you
>>>>>>>>>> can implement one custom sink following this doc[1].
>>>>>>>>>>
>>>>>>>>>> IMO, an out-of-box print table sink is very useful, and I've
>>>>>>>>>> created an issue[2] to track this.
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>>>>>>>>>
>>>>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the pointer. Looks like the documentation says to use
>>>>>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>>>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a 
>>>>>>>>>>> sink that
>>>>>>>>>>> can print to stdout? what sink should I use to print to stdout and 
>>>>>>>>>>> how do I
>>>>>>>>>>> add it without converting into DataStream?
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <
>>>>>>>>>>> pi...@ververica.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked
>>>>>>>>>>>> @Internal. It’s not part of any public API.
>>>>>>>>>>>>
>>>>>>>>>>>> You don’t have to convert DataStream into Table to read from
>>>>>>>>>>>> Kafka in Table API. I guess you could, if you had used DataStream 
>>>>>>>>>>>> API’s
>>>>>>>>>>>> FlinkKafkaConsumer as it’s documented here [1].
>>>>>>>>>>>>
>>>>>>>>>>>> But you should be able to use Kafka Table connector directly,
>>>>>>>>>>>> as it is described in the docs [2][3].
>>>>>>>>>>>>
>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>>>>>>>>>> [2]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>>>>>>>>>>> [3]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>>>>>>>>
>>>>>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Also why do I need to convert to DataStream to print the rows
>>>>>>>>>>>> of a table? Why not have a print method in the Table itself?
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Do I need to use DataStream API or Table API to construct
>>>>>>>>>>>>> sources? I am just trying to read from Kafka and print it to 
>>>>>>>>>>>>> console. And
>>>>>>>>>>>>> yes I tried it with datastreams and it works fine but I want to 
>>>>>>>>>>>>> do it using
>>>>>>>>>>>>> Table related APIs. I don't see any documentation or a sample on 
>>>>>>>>>>>>> how to
>>>>>>>>>>>>> create Kafka table source or any other source using Table Source 
>>>>>>>>>>>>> API's so
>>>>>>>>>>>>> after some digging I wrote the following code. My ultimate goal 
>>>>>>>>>>>>> is to avoid
>>>>>>>>>>>>> Datastream API as much as possible and just use Table API & SQL 
>>>>>>>>>>>>> but somehow
>>>>>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL 
>>>>>>>>>>>>> interface. am
>>>>>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense to 
>>>>>>>>>>>>> focus on
>>>>>>>>>>>>> SQL interfaces for both streaming and batch?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> import 
>>>>>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;import
>>>>>>>>>>>>>  
>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
>>>>>>>>>>>>>  
>>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSource;import
>>>>>>>>>>>>>  org.apache.flink.table.api.DataTypes;import 
>>>>>>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import 
>>>>>>>>>>>>> org.apache.flink.table.api.Table;import 
>>>>>>>>>>>>> org.apache.flink.table.api.TableSchema;import 
>>>>>>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import 
>>>>>>>>>>>>> org.apache.flink.table.sources.TableSource;import 
>>>>>>>>>>>>> org.apache.flink.types.Row;import java.io.IOException;import 
>>>>>>>>>>>>> java.util.Properties;public class Test {
>>>>>>>>>>>>>
>>>>>>>>>>>>>     public class MyDeserializationSchema extends 
>>>>>>>>>>>>> AbstractDeserializationSchema<Row> {
>>>>>>>>>>>>>         @Override        public Row deserialize(byte[] message) 
>>>>>>>>>>>>> throws IOException {
>>>>>>>>>>>>>             return Row.of(new String(message));        }
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>>>>>>         Test test = new Test();        EnvironmentSettings 
>>>>>>>>>>>>> settings = EnvironmentSettings.newInstance()
>>>>>>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>>>>>>                 .inStreamingMode()
>>>>>>>>>>>>>                 .build();        StreamExecutionEnvironment 
>>>>>>>>>>>>> streamExecutionEnvironment = 
>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();        
>>>>>>>>>>>>> StreamTableEnvironment tableEnvironment = 
>>>>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, 
>>>>>>>>>>>>> settings);        TableSource tableSource = 
>>>>>>>>>>>>> test.getKafkaTableSource();        Table kafkaTable = 
>>>>>>>>>>>>> tableEnvironment.fromTableSource(tableSource);        
>>>>>>>>>>>>> tableEnvironment.createTemporaryView("kafka_source", kafkaTable); 
>>>>>>>>>>>>>        Table resultTable = tableEnvironment.sqlQuery("select * 
>>>>>>>>>>>>> from kafka_source");        
>>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();  
>>>>>>>>>>>>>       streamExecutionEnvironment.execute("Sample Job");    }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     public KafkaTableSource getKafkaTableSource() {
>>>>>>>>>>>>>         TableSchema tableSchema = 
>>>>>>>>>>>>> TableSchema.builder().field("f0", DataTypes.STRING()).build();    
>>>>>>>>>>>>>     Properties properties = new Properties();        
>>>>>>>>>>>>> properties.setProperty("bootstrap.servers", "localhost:9092");    
>>>>>>>>>>>>>     properties.setProperty("group.id", "test");        return new 
>>>>>>>>>>>>> KafkaTableSource(tableSchema, "test-topic1", properties, new 
>>>>>>>>>>>>> MyDeserializationSchema());    }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I get the following error
>>>>>>>>>>>>>
>>>>>>>>>>>>> The program finished with the following exception:
>>>>>>>>>>>>>
>>>>>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not
>>>>>>>>>>>>> serializable. The object probably contains or references non 
>>>>>>>>>>>>> serializable
>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>>>>>>>>>>>
>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>>>>
>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>>>>>>>
>>>>>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>>>>>>
>>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>>>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>>>>>>>>>>> Test.main(Test.java:40)
>>>>>>>>>>>>>
>>>>>>>>>>>>> The error seems to be on the line
>>>>>>>>>>>>>
>>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>>>>>>>>>>
>>>>>>>>>>>>> and I am not sure why it is not able to serialize?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Benchao Li
>>>>>>>>>> School of Electronics Engineering and Computer Science, Peking 
>>>>>>>>>> UniversityTel:+86-15650713730
>>>>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Benchao Li
>>>>>> School of Electronics Engineering and Computer Science, Peking 
>>>>>> UniversityTel:+86-15650713730
>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>>
>>>>>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking 
>> UniversityTel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>

Reply via email to