Hi Arvid,

Yes I got it..and it works as said in my previous email.

Thanks!


On Mon, Mar 2, 2020 at 12:10 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Kant,
>
> I think Dawid meant to not add the Kafka version number like this:
>
> flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>
>
> On Sun, Mar 1, 2020 at 7:31 PM kant kodali <kanth...@gmail.com> wrote:
>
>> * What went wrong:
>> Could not determine the dependencies of task ':shadowJar'.
>> > Could not resolve all dependencies for configuration ':flinkShadowJar'.
>>    > Could not find
>> org.apache.flink:flink-sql-connector-kafka_2.11:universal.
>>      Searched in the following locations:
>>        -
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
>>        -
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
>>        -
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
>>        -
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
>>      Required by:
>>          project :
>>
>>
>>
>> On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz <dwysakow...@apache.org>
>> wrote:
>>
>>> Hi Kant,
>>>
>>> If you want to use the *universal *kafka connector you use "universal"
>>> for the version. The community decided to no longer distinguish different
>>> kafka connector versions, but to use the newest kafka client version for
>>> all versions of kafka 1.0+. So if you want to use the connector from
>>> flink-sql-connector-kafka_2.11 use "universal" for the version.
>>>
>>> As for the collect/print sink. We do realize importance of the sink and
>>> there were a few approaches to implement one. Including the TableUtils
>>> mentioned by godfrey. It does not have strong consistency guarantees and is
>>> recommended rather only for experiments/testing. There is also an ongoing
>>> discussion how to implement such a sink for *both *batch and streaming
>>> here:
>>> https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17046455#comment-17046455
>>>
>>> Best,
>>>
>>> Dawid
>>> On 01/03/2020 12:00, kant kodali wrote:
>>>
>>> Hi Benchao,
>>>
>>> That worked! Pasting the build.gradle file here. However this only works
>>> for 0.11 and it needs zookeeper.connect() which shouldn't be required. not
>>> sure why it is required in Flink Kafka connector?  If I change the version
>>> to 2.2 in the code and specify this jar
>>>
>>> flinkShadowJar 
>>> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>>>
>>> or
>>>
>>> flinkShadowJar 
>>> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" 
>>> //Not sure if I should use this one for Kafka >= 0.11
>>>
>>> It doesn't work either.
>>>
>>>
>>> buildscript {    repositories {        jcenter() // this applies only to 
>>> the Gradle 'Shadow' plugin    }    dependencies {        classpath 
>>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'    }}plugins {    id 
>>> 'java'    id 'application'}mainClassName = 'Test'apply plugin: 
>>> 'com.github.johnrengelman.shadow'ext {    javaVersion = '1.8'    
>>> flinkVersion = '1.10.0'    scalaBinaryVersion = '2.11'    slf4jVersion = 
>>> '1.7.7'    log4jVersion = '1.2.17'}sourceCompatibility = 
>>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) {   
>>>  options.encoding = 'UTF-8'}applicationDefaultJvmArgs = 
>>> ["-Dlog4j.configuration=log4j.properties"]
>>> // declare where to find the dependencies of your projectrepositories {    
>>> mavenCentral()
>>>     maven { url 
>>> "https://repository.apache.org/content/repositories/snapshots/"; }}// NOTE: 
>>> We cannot use "compileOnly" or "shadow" configurations since then we could 
>>> not run code// in the IDE or with "gradle run". We also cannot exclude 
>>> transitive dependencies from the// shadowJar yet (see 
>>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly define 
>>> the // libraries we want to be included in the "flinkShadowJar" 
>>> configuration!configurations {    flinkShadowJar // dependencies which go 
>>> into the shadowJar    // always exclude these (also from transitive 
>>> dependencies) since they are provided by Flink    flinkShadowJar.exclude 
>>> group: 'org.apache.flink', module: 'force-shading'    
>>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'  
>>>   flinkShadowJar.exclude group: 'org.slf4j'    flinkShadowJar.exclude 
>>> group: 'log4j'}// declare the dependencies for your production and test 
>>> codedependencies {    // 
>>> --------------------------------------------------------------    // 
>>> Compile-time dependencies that should NOT be part of the    // shadow jar 
>>> and are provided in the lib folder of Flink    // 
>>> --------------------------------------------------------------    compile 
>>> "org.apache.flink:flink-java:${flinkVersion}"    compile 
>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>     // --------------------------------------------------------------    // 
>>> Dependencies that should be part of the shadow jar, e.g.    // connectors. 
>>> These must be in the flinkShadowJar configuration!    // 
>>> --------------------------------------------------------------    compile 
>>> "org.apache.flink:flink-java:${flinkVersion}"    compile 
>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>     flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"    // tried 
>>> doesnt work. same problem    //flinkShadowJar 
>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"    // tried 
>>> doesnt work. same problem    //flinkShadowJar 
>>> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"    
>>> //tried doesnt work. same problem    flinkShadowJar 
>>> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}"    
>>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"    
>>> compileOnly 
>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
>>> compileOnly 
>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"    
>>> flinkShadowJar 
>>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"    
>>> flinkShadowJar 
>>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"    
>>> compile "log4j:log4j:${log4jVersion}"    compile 
>>> "org.slf4j:slf4j-log4j12:${slf4jVersion}"    // Add test dependencies here. 
>>>    // testCompile "junit:junit:4.12"    testImplementation 
>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
>>> testImplementation 
>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// make 
>>> compileOnly dependencies available for tests:sourceSets {    
>>> main.compileClasspath += configurations.flinkShadowJar
>>>     main.runtimeClasspath += configurations.flinkShadowJar
>>>
>>>     test.compileClasspath += configurations.flinkShadowJar
>>>     test.runtimeClasspath += configurations.flinkShadowJar
>>>
>>>     javadoc.classpath += configurations.flinkShadowJar}run.classpath = 
>>> sourceSets.main.runtimeClasspathjar {    manifest {        attributes 
>>> 'Built-By': System.getProperty('user.name'),
>>>                 'Build-Jdk': System.getProperty('java.version')
>>>     }}shadowJar {    configurations = 
>>> [project.configurations.flinkShadowJar]
>>>     mergeServiceFiles()
>>>     manifest {        attributes 'Main-Class': mainClassName    }}
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Mar 1, 2020 at 1:38 AM Benchao Li <libenc...@gmail.com> wrote:
>>>
>>>> I don't know how gradle works, but in Maven, packaging
>>>> dependencies into one fat jar needs to specify how SPI property files
>>>> should be dealt with, like
>>>>
>>>> <transformers>    <transformer 
>>>> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers>
>>>>
>>>> Could you check that your final jar contains correct resource file?
>>>>
>>>> godfrey he <godfre...@gmail.com> 于2020年3月1日周日 下午5:25写道:
>>>>
>>>>> I think you should use `flink-sql-connector-kafka*-0.11*_2.11`
>>>>> instead of `flink-connector-kafka_2.11`.
>>>>>
>>>>> Bests,
>>>>> Godfrey
>>>>>
>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道:
>>>>>
>>>>>> The dependency was already there. Below is my build.gradle. Also I
>>>>>> checked the kafka version and looks like the jar
>>>>>>
>>>>>> flinkShadowJar 
>>>>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>>>>>>
>>>>>> downloads kafka-clients version 2.2.0. So I changed my code to
>>>>>> version 2.2.0 and same problem persists.
>>>>>>
>>>>>> buildscript {    repositories {        jcenter() // this applies only to 
>>>>>> the Gradle 'Shadow' plugin    }    dependencies {        classpath 
>>>>>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'    }}plugins {    id 
>>>>>> 'java'    id 'application'}mainClassName = 'Test'apply plugin: 
>>>>>> 'com.github.johnrengelman.shadow'ext {    javaVersion = '1.8'    
>>>>>> flinkVersion = '1.10.0'    scalaBinaryVersion = '2.11'    slf4jVersion = 
>>>>>> '1.7.7'    log4jVersion = '1.2.17'}sourceCompatibility = 
>>>>>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) 
>>>>>> {    options.encoding = 'UTF-8'}applicationDefaultJvmArgs = 
>>>>>> ["-Dlog4j.configuration=log4j.properties"]
>>>>>> // declare where to find the dependencies of your projectrepositories {  
>>>>>>   mavenCentral()
>>>>>>     maven { url 
>>>>>> "https://repository.apache.org/content/repositories/snapshots/"; }}// 
>>>>>> NOTE: We cannot use "compileOnly" or "shadow" configurations since then 
>>>>>> we could not run code// in the IDE or with "gradle run". We also cannot 
>>>>>> exclude transitive dependencies from the// shadowJar yet (see 
>>>>>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly 
>>>>>> define the // libraries we want to be included in the "flinkShadowJar" 
>>>>>> configuration!configurations {    flinkShadowJar // dependencies which 
>>>>>> go into the shadowJar    // always exclude these (also from transitive 
>>>>>> dependencies) since they are provided by Flink    flinkShadowJar.exclude 
>>>>>> group: 'org.apache.flink', module: 'force-shading'    
>>>>>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 
>>>>>> 'jsr305'    flinkShadowJar.exclude group: 'org.slf4j'    
>>>>>> flinkShadowJar.exclude group: 'log4j'}// declare the dependencies for 
>>>>>> your production and test codedependencies {    // 
>>>>>> --------------------------------------------------------------    // 
>>>>>> Compile-time dependencies that should NOT be part of the    // shadow 
>>>>>> jar and are provided in the lib folder of Flink    // 
>>>>>> --------------------------------------------------------------    
>>>>>> compile "org.apache.flink:flink-java:${flinkVersion}"    compile 
>>>>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>>>>     // --------------------------------------------------------------    
>>>>>> // Dependencies that should be part of the shadow jar, e.g.    // 
>>>>>> connectors. These must be in the flinkShadowJar configuration!    // 
>>>>>> --------------------------------------------------------------    
>>>>>> compile "org.apache.flink:flink-java:${flinkVersion}"    compile 
>>>>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>>>>     flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"    
>>>>>> flinkShadowJar 
>>>>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"    
>>>>>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"   
>>>>>>  compileOnly 
>>>>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
>>>>>> compileOnly 
>>>>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"    
>>>>>> flinkShadowJar 
>>>>>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"    
>>>>>> flinkShadowJar 
>>>>>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"    
>>>>>> compile "log4j:log4j:${log4jVersion}"    compile 
>>>>>> "org.slf4j:slf4j-log4j12:${slf4jVersion}"    // Add test dependencies 
>>>>>> here.    // testCompile "junit:junit:4.12"    testImplementation 
>>>>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"    
>>>>>> testImplementation 
>>>>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// 
>>>>>> make compileOnly dependencies available for tests:sourceSets {    
>>>>>> main.compileClasspath += configurations.flinkShadowJar
>>>>>>     main.runtimeClasspath += configurations.flinkShadowJar
>>>>>>
>>>>>>     test.compileClasspath += configurations.flinkShadowJar
>>>>>>     test.runtimeClasspath += configurations.flinkShadowJar
>>>>>>
>>>>>>     javadoc.classpath += configurations.flinkShadowJar}run.classpath = 
>>>>>> sourceSets.main.runtimeClasspathjar {    manifest {        attributes 
>>>>>> 'Built-By': System.getProperty('user.name'),
>>>>>>                 'Build-Jdk': System.getProperty('java.version')
>>>>>>     }}shadowJar {    configurations = 
>>>>>> [project.configurations.flinkShadowJar]}
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> hi kant,
>>>>>>>
>>>>>>> > Also why do I need to convert to DataStream to print the rows of a
>>>>>>> table? Why not have a print method in the Table itself?
>>>>>>> Flink 1.10 introduces a utility class named TableUtils to convert a
>>>>>>> Table to List<Row>, this utility class is mainly used for demonstration 
>>>>>>> or
>>>>>>> testing and is only applicable for *small batch jobs* and small
>>>>>>> finite *append only stream jobs*.  code like:
>>>>>>> Table table = tEnv.sqlQuery("select ...");
>>>>>>> List<Row> result = TableUtils.collectToList(table);
>>>>>>> result.....
>>>>>>>
>>>>>>> currently, we are planner to implement Table#collect[1], after
>>>>>>> that Table#head and Table#print may be also introduced soon.
>>>>>>>
>>>>>>> >  The program finished with the following exception:
>>>>>>> please make sure that the kafka version in Test class and the kafka
>>>>>>> version in pom dependency are same. I tested your code successfully.
>>>>>>>
>>>>>>> Bests,
>>>>>>> Godfrey
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-14807
>>>>>>>
>>>>>>>
>>>>>>> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道:
>>>>>>>
>>>>>>>> Hi kant,
>>>>>>>>
>>>>>>>> CSV format is an independent module, you need to add it as your
>>>>>>>> dependency.
>>>>>>>>
>>>>>>>> <dependency>   <groupId>org.apache.flink</groupId>   
>>>>>>>> <artifactId>flink-csv</artifactId>   
>>>>>>>> <version>${flink.version}</version></dependency>
>>>>>>>>
>>>>>>>>
>>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道:
>>>>>>>>
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>  The program finished with the following exception:
>>>>>>>>>
>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>>>> main method caused an error: findAndCreateTableSource failed.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>>>> findAndCreateTableSource failed.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>>>>>>>>> at
>>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>>>>>>>> at
>>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>>>>>>>> at
>>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>>>>>>>> at
>>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>>>>>>>> at
>>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>>>>>>>> at
>>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>>>>>>>> at
>>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>>>>>>>> at
>>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>>>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>>>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>>>>>>>>> at Test.main(Test.java:34)
>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>> at
>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>> at
>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>>>>>>> ... 8 more
>>>>>>>>> Caused by:
>>>>>>>>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not 
>>>>>>>>> find
>>>>>>>>> a suitable table factory for
>>>>>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>>>>>> the classpath.
>>>>>>>>>
>>>>>>>>> Reason: Required context properties mismatch.
>>>>>>>>>
>>>>>>>>> The matching candidates:
>>>>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>>>>>> Mismatched properties:
>>>>>>>>> 'connector.type' expects 'filesystem', but is 'kafka'
>>>>>>>>>
>>>>>>>>> The following properties are requested:
>>>>>>>>> connector.property-version=1
>>>>>>>>> connector.topic=test-topic1
>>>>>>>>> connector.type=kafka
>>>>>>>>> connector.version=0.11
>>>>>>>>> format.property-version=1
>>>>>>>>> format.type=csv
>>>>>>>>> schema.0.data-type=VARCHAR(2147483647)
>>>>>>>>> schema.0.name=f0
>>>>>>>>> update-mode=append
>>>>>>>>>
>>>>>>>>> The following factories have been considered:
>>>>>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>>>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
>>>>>>>>> ... 34 more
>>>>>>>>>
>>>>>>>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <
>>>>>>>>> pi...@ververica.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> > Thanks for the pointer. Looks like the documentation says to
>>>>>>>>>> use tableEnv.registerTableSink however in my IDE it shows the method 
>>>>>>>>>> is
>>>>>>>>>> deprecated in Flink 1.10.
>>>>>>>>>>
>>>>>>>>>> It looks like not all of the documentation was updated after
>>>>>>>>>> methods were deprecated. However if you look at the java docs of the
>>>>>>>>>> `registerTableSink` method, you can find an answer [1].
>>>>>>>>>>
>>>>>>>>>> >  It sill doesnt work because it says for CSV the
>>>>>>>>>> connector.type should be filesystem not Kafka.
>>>>>>>>>>
>>>>>>>>>> Can you post the full stack trace? As I’m not familiar with the
>>>>>>>>>> Table API, maybe you Timo or Dawid know what’s going on here?
>>>>>>>>>>
>>>>>>>>>> Piotrek
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
>>>>>>>>>>
>>>>>>>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Here is my updated code after digging through the source code
>>>>>>>>>> (not sure if it is correct ). It sill doesnt work because it says 
>>>>>>>>>> for CSV
>>>>>>>>>> the connector.type should be filesystem not Kafka but documentation 
>>>>>>>>>> says it
>>>>>>>>>> is supported.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> import 
>>>>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import 
>>>>>>>>>> org.apache.flink.runtime.state.StateBackend;import 
>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
>>>>>>>>>>  org.apache.flink.table.api.DataTypes;import 
>>>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import 
>>>>>>>>>> org.apache.flink.table.api.Table;import 
>>>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import 
>>>>>>>>>> org.apache.flink.table.descriptors.Csv;import 
>>>>>>>>>> org.apache.flink.table.descriptors.Kafka;import 
>>>>>>>>>> org.apache.flink.table.descriptors.Schema;public class Test {
>>>>>>>>>>
>>>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>>>         EnvironmentSettings settings = 
>>>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>>>                 .inStreamingMode()
>>>>>>>>>>                 .build();        StreamExecutionEnvironment 
>>>>>>>>>> streamExecutionEnvironment = 
>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();        
>>>>>>>>>> streamExecutionEnvironment.setStateBackend((StateBackend) new 
>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb"));        
>>>>>>>>>> StreamTableEnvironment tableEnvironment = 
>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); 
>>>>>>>>>>        tableEnvironment
>>>>>>>>>>             .connect(
>>>>>>>>>>                 new Kafka()
>>>>>>>>>>                     .version("0.11")
>>>>>>>>>>                     .topic("test-topic1")
>>>>>>>>>>             )
>>>>>>>>>>             .withFormat(new Csv())
>>>>>>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>>>>>>             .inAppendMode()
>>>>>>>>>>             .createTemporaryTable("kafka_source");        Table 
>>>>>>>>>> resultTable = tableEnvironment.sqlQuery("select * from 
>>>>>>>>>> kafka_source");        tableEnvironment
>>>>>>>>>>             .connect(
>>>>>>>>>>                 new Kafka()
>>>>>>>>>>                     .version("0.11")
>>>>>>>>>>                     .topic("test-topic2")
>>>>>>>>>>             )
>>>>>>>>>>             .withFormat(new Csv())
>>>>>>>>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>>>>>>>>             .inAppendMode()
>>>>>>>>>>             .createTemporaryTable("kafka_target");        
>>>>>>>>>> tableEnvironment.insertInto("kafka_target", resultTable);        
>>>>>>>>>> tableEnvironment.execute("Sample Job");    }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Benchao,
>>>>>>>>>>>
>>>>>>>>>>> Agreed a ConsoleSink is very useful but that is not the only
>>>>>>>>>>> problem here. Documentation says use  tableEnv.registerTableSink 
>>>>>>>>>>> all over
>>>>>>>>>>> the place
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>>>>>>>>>>> however that function is deprecated. so how do I add any other Sink?
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi kant,
>>>>>>>>>>>>
>>>>>>>>>>>> AFAIK, there is no "print to stdout" sink for Table API now,
>>>>>>>>>>>> you can implement one custom sink following this doc[1].
>>>>>>>>>>>>
>>>>>>>>>>>> IMO, an out-of-box print table sink is very useful, and I've
>>>>>>>>>>>> created an issue[2] to track this.
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>>>>>>>>>>>
>>>>>>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the pointer. Looks like the documentation says to
>>>>>>>>>>>>> use tableEnv.registerTableSink however in my IDE it shows the 
>>>>>>>>>>>>> method is
>>>>>>>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a 
>>>>>>>>>>>>> sink that
>>>>>>>>>>>>> can print to stdout? what sink should I use to print to stdout 
>>>>>>>>>>>>> and how do I
>>>>>>>>>>>>> add it without converting into DataStream?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <
>>>>>>>>>>>>> pi...@ververica.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked
>>>>>>>>>>>>>> @Internal. It’s not part of any public API.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> You don’t have to convert DataStream into Table to read from
>>>>>>>>>>>>>> Kafka in Table API. I guess you could, if you had used 
>>>>>>>>>>>>>> DataStream API’s
>>>>>>>>>>>>>> FlinkKafkaConsumer as it’s documented here [1].
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> But you should be able to use Kafka Table connector directly,
>>>>>>>>>>>>>> as it is described in the docs [2][3].
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>>>>>>>>>>>>> [3]
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also why do I need to convert to DataStream to print the rows
>>>>>>>>>>>>>> of a table? Why not have a print method in the Table itself?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <
>>>>>>>>>>>>>> kanth...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Do I need to use DataStream API or Table API to construct
>>>>>>>>>>>>>>> sources? I am just trying to read from Kafka and print it to 
>>>>>>>>>>>>>>> console. And
>>>>>>>>>>>>>>> yes I tried it with datastreams and it works fine but I want to 
>>>>>>>>>>>>>>> do it using
>>>>>>>>>>>>>>> Table related APIs. I don't see any documentation or a sample 
>>>>>>>>>>>>>>> on how to
>>>>>>>>>>>>>>> create Kafka table source or any other source using Table 
>>>>>>>>>>>>>>> Source API's so
>>>>>>>>>>>>>>> after some digging I wrote the following code. My ultimate goal 
>>>>>>>>>>>>>>> is to avoid
>>>>>>>>>>>>>>> Datastream API as much as possible and just use Table API & SQL 
>>>>>>>>>>>>>>> but somehow
>>>>>>>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL 
>>>>>>>>>>>>>>> interface. am
>>>>>>>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense 
>>>>>>>>>>>>>>> to focus on
>>>>>>>>>>>>>>> SQL interfaces for both streaming and batch?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> import 
>>>>>>>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;import
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSource;import
>>>>>>>>>>>>>>>  org.apache.flink.table.api.DataTypes;import 
>>>>>>>>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import 
>>>>>>>>>>>>>>> org.apache.flink.table.api.Table;import 
>>>>>>>>>>>>>>> org.apache.flink.table.api.TableSchema;import 
>>>>>>>>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import 
>>>>>>>>>>>>>>> org.apache.flink.table.sources.TableSource;import 
>>>>>>>>>>>>>>> org.apache.flink.types.Row;import java.io.IOException;import 
>>>>>>>>>>>>>>> java.util.Properties;public class Test {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     public class MyDeserializationSchema extends 
>>>>>>>>>>>>>>> AbstractDeserializationSchema<Row> {
>>>>>>>>>>>>>>>         @Override        public Row deserialize(byte[] message) 
>>>>>>>>>>>>>>> throws IOException {
>>>>>>>>>>>>>>>             return Row.of(new String(message));        }
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>>>>>>>>         Test test = new Test();        EnvironmentSettings 
>>>>>>>>>>>>>>> settings = EnvironmentSettings.newInstance()
>>>>>>>>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>>>>>>>>                 .inStreamingMode()
>>>>>>>>>>>>>>>                 .build();        StreamExecutionEnvironment 
>>>>>>>>>>>>>>> streamExecutionEnvironment = 
>>>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();        
>>>>>>>>>>>>>>> StreamTableEnvironment tableEnvironment = 
>>>>>>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, 
>>>>>>>>>>>>>>> settings);        TableSource tableSource = 
>>>>>>>>>>>>>>> test.getKafkaTableSource();        Table kafkaTable = 
>>>>>>>>>>>>>>> tableEnvironment.fromTableSource(tableSource);        
>>>>>>>>>>>>>>> tableEnvironment.createTemporaryView("kafka_source", 
>>>>>>>>>>>>>>> kafkaTable);        Table resultTable = 
>>>>>>>>>>>>>>> tableEnvironment.sqlQuery("select * from kafka_source");        
>>>>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, 
>>>>>>>>>>>>>>> Row.class).print();        
>>>>>>>>>>>>>>> streamExecutionEnvironment.execute("Sample Job");    }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     public KafkaTableSource getKafkaTableSource() {
>>>>>>>>>>>>>>>         TableSchema tableSchema = 
>>>>>>>>>>>>>>> TableSchema.builder().field("f0", DataTypes.STRING()).build();  
>>>>>>>>>>>>>>>       Properties properties = new Properties();        
>>>>>>>>>>>>>>> properties.setProperty("bootstrap.servers", "localhost:9092");  
>>>>>>>>>>>>>>>       properties.setProperty("group.id", "test");        return 
>>>>>>>>>>>>>>> new KafkaTableSource(tableSchema, "test-topic1", properties, 
>>>>>>>>>>>>>>> new MyDeserializationSchema());    }
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I get the following error
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The program finished with the following exception:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not
>>>>>>>>>>>>>>> serializable. The object probably contains or references non 
>>>>>>>>>>>>>>> serializable
>>>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>>>>>>>>>>>>> Test.main(Test.java:40)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The error seems to be on the line
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and I am not sure why it is not able to serialize?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Benchao Li
>>>>>>>>>>>> School of Electronics Engineering and Computer Science, Peking 
>>>>>>>>>>>> UniversityTel:+86-15650713730
>>>>>>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Benchao Li
>>>>>>>> School of Electronics Engineering and Computer Science, Peking 
>>>>>>>> UniversityTel:+86-15650713730
>>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>>>>
>>>>>>>>
>>>>
>>>> --
>>>>
>>>> Benchao Li
>>>> School of Electronics Engineering and Computer Science, Peking 
>>>> UniversityTel:+86-15650713730
>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>
>>>>

Reply via email to