Hi Arvid, Yes I got it..and it works as said in my previous email.
Thanks! On Mon, Mar 2, 2020 at 12:10 AM Arvid Heise <ar...@ververica.com> wrote: > Hi Kant, > > I think Dawid meant to not add the Kafka version number like this: > > flinkShadowJar > "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" > > > On Sun, Mar 1, 2020 at 7:31 PM kant kodali <kanth...@gmail.com> wrote: > >> * What went wrong: >> Could not determine the dependencies of task ':shadowJar'. >> > Could not resolve all dependencies for configuration ':flinkShadowJar'. >> > Could not find >> org.apache.flink:flink-sql-connector-kafka_2.11:universal. >> Searched in the following locations: >> - >> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom >> - >> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar >> - >> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom >> - >> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar >> Required by: >> project : >> >> >> >> On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >>> Hi Kant, >>> >>> If you want to use the *universal *kafka connector you use "universal" >>> for the version. The community decided to no longer distinguish different >>> kafka connector versions, but to use the newest kafka client version for >>> all versions of kafka 1.0+. So if you want to use the connector from >>> flink-sql-connector-kafka_2.11 use "universal" for the version. >>> >>> As for the collect/print sink. We do realize importance of the sink and >>> there were a few approaches to implement one. Including the TableUtils >>> mentioned by godfrey. It does not have strong consistency guarantees and is >>> recommended rather only for experiments/testing. There is also an ongoing >>> discussion how to implement such a sink for *both *batch and streaming >>> here: >>> https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17046455#comment-17046455 >>> >>> Best, >>> >>> Dawid >>> On 01/03/2020 12:00, kant kodali wrote: >>> >>> Hi Benchao, >>> >>> That worked! Pasting the build.gradle file here. However this only works >>> for 0.11 and it needs zookeeper.connect() which shouldn't be required. not >>> sure why it is required in Flink Kafka connector? If I change the version >>> to 2.2 in the code and specify this jar >>> >>> flinkShadowJar >>> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" >>> >>> or >>> >>> flinkShadowJar >>> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" >>> //Not sure if I should use this one for Kafka >= 0.11 >>> >>> It doesn't work either. >>> >>> >>> buildscript { repositories { jcenter() // this applies only to >>> the Gradle 'Shadow' plugin } dependencies { classpath >>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4' }}plugins { id >>> 'java' id 'application'}mainClassName = 'Test'apply plugin: >>> 'com.github.johnrengelman.shadow'ext { javaVersion = '1.8' >>> flinkVersion = '1.10.0' scalaBinaryVersion = '2.11' slf4jVersion = >>> '1.7.7' log4jVersion = '1.2.17'}sourceCompatibility = >>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) { >>> options.encoding = 'UTF-8'}applicationDefaultJvmArgs = >>> ["-Dlog4j.configuration=log4j.properties"] >>> // declare where to find the dependencies of your projectrepositories { >>> mavenCentral() >>> maven { url >>> "https://repository.apache.org/content/repositories/snapshots/" }}// NOTE: >>> We cannot use "compileOnly" or "shadow" configurations since then we could >>> not run code// in the IDE or with "gradle run". We also cannot exclude >>> transitive dependencies from the// shadowJar yet (see >>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly define >>> the // libraries we want to be included in the "flinkShadowJar" >>> configuration!configurations { flinkShadowJar // dependencies which go >>> into the shadowJar // always exclude these (also from transitive >>> dependencies) since they are provided by Flink flinkShadowJar.exclude >>> group: 'org.apache.flink', module: 'force-shading' >>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305' >>> flinkShadowJar.exclude group: 'org.slf4j' flinkShadowJar.exclude >>> group: 'log4j'}// declare the dependencies for your production and test >>> codedependencies { // >>> -------------------------------------------------------------- // >>> Compile-time dependencies that should NOT be part of the // shadow jar >>> and are provided in the lib folder of Flink // >>> -------------------------------------------------------------- compile >>> "org.apache.flink:flink-java:${flinkVersion}" compile >>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >>> // -------------------------------------------------------------- // >>> Dependencies that should be part of the shadow jar, e.g. // connectors. >>> These must be in the flinkShadowJar configuration! // >>> -------------------------------------------------------------- compile >>> "org.apache.flink:flink-java:${flinkVersion}" compile >>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >>> flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}" // tried >>> doesnt work. same problem //flinkShadowJar >>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" // tried >>> doesnt work. same problem //flinkShadowJar >>> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" >>> //tried doesnt work. same problem flinkShadowJar >>> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" >>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" >>> compileOnly >>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >>> compileOnly >>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" >>> flinkShadowJar >>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" >>> flinkShadowJar >>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" >>> compile "log4j:log4j:${log4jVersion}" compile >>> "org.slf4j:slf4j-log4j12:${slf4jVersion}" // Add test dependencies here. >>> // testCompile "junit:junit:4.12" testImplementation >>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >>> testImplementation >>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// make >>> compileOnly dependencies available for tests:sourceSets { >>> main.compileClasspath += configurations.flinkShadowJar >>> main.runtimeClasspath += configurations.flinkShadowJar >>> >>> test.compileClasspath += configurations.flinkShadowJar >>> test.runtimeClasspath += configurations.flinkShadowJar >>> >>> javadoc.classpath += configurations.flinkShadowJar}run.classpath = >>> sourceSets.main.runtimeClasspathjar { manifest { attributes >>> 'Built-By': System.getProperty('user.name'), >>> 'Build-Jdk': System.getProperty('java.version') >>> }}shadowJar { configurations = >>> [project.configurations.flinkShadowJar] >>> mergeServiceFiles() >>> manifest { attributes 'Main-Class': mainClassName }} >>> >>> >>> >>> >>> >>> >>> On Sun, Mar 1, 2020 at 1:38 AM Benchao Li <libenc...@gmail.com> wrote: >>> >>>> I don't know how gradle works, but in Maven, packaging >>>> dependencies into one fat jar needs to specify how SPI property files >>>> should be dealt with, like >>>> >>>> <transformers> <transformer >>>> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers> >>>> >>>> Could you check that your final jar contains correct resource file? >>>> >>>> godfrey he <godfre...@gmail.com> 于2020年3月1日周日 下午5:25写道: >>>> >>>>> I think you should use `flink-sql-connector-kafka*-0.11*_2.11` >>>>> instead of `flink-connector-kafka_2.11`. >>>>> >>>>> Bests, >>>>> Godfrey >>>>> >>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道: >>>>> >>>>>> The dependency was already there. Below is my build.gradle. Also I >>>>>> checked the kafka version and looks like the jar >>>>>> >>>>>> flinkShadowJar >>>>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" >>>>>> >>>>>> downloads kafka-clients version 2.2.0. So I changed my code to >>>>>> version 2.2.0 and same problem persists. >>>>>> >>>>>> buildscript { repositories { jcenter() // this applies only to >>>>>> the Gradle 'Shadow' plugin } dependencies { classpath >>>>>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4' }}plugins { id >>>>>> 'java' id 'application'}mainClassName = 'Test'apply plugin: >>>>>> 'com.github.johnrengelman.shadow'ext { javaVersion = '1.8' >>>>>> flinkVersion = '1.10.0' scalaBinaryVersion = '2.11' slf4jVersion = >>>>>> '1.7.7' log4jVersion = '1.2.17'}sourceCompatibility = >>>>>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) >>>>>> { options.encoding = 'UTF-8'}applicationDefaultJvmArgs = >>>>>> ["-Dlog4j.configuration=log4j.properties"] >>>>>> // declare where to find the dependencies of your projectrepositories { >>>>>> mavenCentral() >>>>>> maven { url >>>>>> "https://repository.apache.org/content/repositories/snapshots/" }}// >>>>>> NOTE: We cannot use "compileOnly" or "shadow" configurations since then >>>>>> we could not run code// in the IDE or with "gradle run". We also cannot >>>>>> exclude transitive dependencies from the// shadowJar yet (see >>>>>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly >>>>>> define the // libraries we want to be included in the "flinkShadowJar" >>>>>> configuration!configurations { flinkShadowJar // dependencies which >>>>>> go into the shadowJar // always exclude these (also from transitive >>>>>> dependencies) since they are provided by Flink flinkShadowJar.exclude >>>>>> group: 'org.apache.flink', module: 'force-shading' >>>>>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: >>>>>> 'jsr305' flinkShadowJar.exclude group: 'org.slf4j' >>>>>> flinkShadowJar.exclude group: 'log4j'}// declare the dependencies for >>>>>> your production and test codedependencies { // >>>>>> -------------------------------------------------------------- // >>>>>> Compile-time dependencies that should NOT be part of the // shadow >>>>>> jar and are provided in the lib folder of Flink // >>>>>> -------------------------------------------------------------- >>>>>> compile "org.apache.flink:flink-java:${flinkVersion}" compile >>>>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >>>>>> // -------------------------------------------------------------- >>>>>> // Dependencies that should be part of the shadow jar, e.g. // >>>>>> connectors. These must be in the flinkShadowJar configuration! // >>>>>> -------------------------------------------------------------- >>>>>> compile "org.apache.flink:flink-java:${flinkVersion}" compile >>>>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >>>>>> flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}" >>>>>> flinkShadowJar >>>>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" >>>>>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" >>>>>> compileOnly >>>>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >>>>>> compileOnly >>>>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" >>>>>> flinkShadowJar >>>>>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" >>>>>> flinkShadowJar >>>>>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" >>>>>> compile "log4j:log4j:${log4jVersion}" compile >>>>>> "org.slf4j:slf4j-log4j12:${slf4jVersion}" // Add test dependencies >>>>>> here. // testCompile "junit:junit:4.12" testImplementation >>>>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >>>>>> testImplementation >>>>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// >>>>>> make compileOnly dependencies available for tests:sourceSets { >>>>>> main.compileClasspath += configurations.flinkShadowJar >>>>>> main.runtimeClasspath += configurations.flinkShadowJar >>>>>> >>>>>> test.compileClasspath += configurations.flinkShadowJar >>>>>> test.runtimeClasspath += configurations.flinkShadowJar >>>>>> >>>>>> javadoc.classpath += configurations.flinkShadowJar}run.classpath = >>>>>> sourceSets.main.runtimeClasspathjar { manifest { attributes >>>>>> 'Built-By': System.getProperty('user.name'), >>>>>> 'Build-Jdk': System.getProperty('java.version') >>>>>> }}shadowJar { configurations = >>>>>> [project.configurations.flinkShadowJar]} >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> hi kant, >>>>>>> >>>>>>> > Also why do I need to convert to DataStream to print the rows of a >>>>>>> table? Why not have a print method in the Table itself? >>>>>>> Flink 1.10 introduces a utility class named TableUtils to convert a >>>>>>> Table to List<Row>, this utility class is mainly used for demonstration >>>>>>> or >>>>>>> testing and is only applicable for *small batch jobs* and small >>>>>>> finite *append only stream jobs*. code like: >>>>>>> Table table = tEnv.sqlQuery("select ..."); >>>>>>> List<Row> result = TableUtils.collectToList(table); >>>>>>> result..... >>>>>>> >>>>>>> currently, we are planner to implement Table#collect[1], after >>>>>>> that Table#head and Table#print may be also introduced soon. >>>>>>> >>>>>>> > The program finished with the following exception: >>>>>>> please make sure that the kafka version in Test class and the kafka >>>>>>> version in pom dependency are same. I tested your code successfully. >>>>>>> >>>>>>> Bests, >>>>>>> Godfrey >>>>>>> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-14807 >>>>>>> >>>>>>> >>>>>>> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道: >>>>>>> >>>>>>>> Hi kant, >>>>>>>> >>>>>>>> CSV format is an independent module, you need to add it as your >>>>>>>> dependency. >>>>>>>> >>>>>>>> <dependency> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-csv</artifactId> >>>>>>>> <version>${flink.version}</version></dependency> >>>>>>>> >>>>>>>> >>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道: >>>>>>>> >>>>>>>>> ------------------------------------------------------------ >>>>>>>>> The program finished with the following exception: >>>>>>>>> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>>>> main method caused an error: findAndCreateTableSource failed. >>>>>>>>> at >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >>>>>>>>> Caused by: org.apache.flink.table.api.TableException: >>>>>>>>> findAndCreateTableSource failed. >>>>>>>>> at >>>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) >>>>>>>>> at >>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>>>>>>>> at >>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>>>>>>>> at >>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>>>>>>>> at >>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>>>>>>>> at >>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>>>>>>>> at >>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>>>>>>>> at >>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>>>>>>>> at >>>>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>>>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>>>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) >>>>>>>>> at Test.main(Test.java:34) >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>> at >>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>>> at >>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>>>>>>> ... 8 more >>>>>>>>> Caused by: >>>>>>>>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not >>>>>>>>> find >>>>>>>>> a suitable table factory for >>>>>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>>>>>>> the classpath. >>>>>>>>> >>>>>>>>> Reason: Required context properties mismatch. >>>>>>>>> >>>>>>>>> The matching candidates: >>>>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>>>>>> Mismatched properties: >>>>>>>>> 'connector.type' expects 'filesystem', but is 'kafka' >>>>>>>>> >>>>>>>>> The following properties are requested: >>>>>>>>> connector.property-version=1 >>>>>>>>> connector.topic=test-topic1 >>>>>>>>> connector.type=kafka >>>>>>>>> connector.version=0.11 >>>>>>>>> format.property-version=1 >>>>>>>>> format.type=csv >>>>>>>>> schema.0.data-type=VARCHAR(2147483647) >>>>>>>>> schema.0.name=f0 >>>>>>>>> update-mode=append >>>>>>>>> >>>>>>>>> The following factories have been considered: >>>>>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>>>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>>>>>> at >>>>>>>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >>>>>>>>> ... 34 more >>>>>>>>> >>>>>>>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski < >>>>>>>>> pi...@ververica.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> > Thanks for the pointer. Looks like the documentation says to >>>>>>>>>> use tableEnv.registerTableSink however in my IDE it shows the method >>>>>>>>>> is >>>>>>>>>> deprecated in Flink 1.10. >>>>>>>>>> >>>>>>>>>> It looks like not all of the documentation was updated after >>>>>>>>>> methods were deprecated. However if you look at the java docs of the >>>>>>>>>> `registerTableSink` method, you can find an answer [1]. >>>>>>>>>> >>>>>>>>>> > It sill doesnt work because it says for CSV the >>>>>>>>>> connector.type should be filesystem not Kafka. >>>>>>>>>> >>>>>>>>>> Can you post the full stack trace? As I’m not familiar with the >>>>>>>>>> Table API, maybe you Timo or Dawid know what’s going on here? >>>>>>>>>> >>>>>>>>>> Piotrek >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink- >>>>>>>>>> >>>>>>>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>> Here is my updated code after digging through the source code >>>>>>>>>> (not sure if it is correct ). It sill doesnt work because it says >>>>>>>>>> for CSV >>>>>>>>>> the connector.type should be filesystem not Kafka but documentation >>>>>>>>>> says it >>>>>>>>>> is supported. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> import >>>>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import >>>>>>>>>> org.apache.flink.runtime.state.StateBackend;import >>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import >>>>>>>>>> org.apache.flink.table.api.DataTypes;import >>>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import >>>>>>>>>> org.apache.flink.table.api.Table;import >>>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import >>>>>>>>>> org.apache.flink.table.descriptors.Csv;import >>>>>>>>>> org.apache.flink.table.descriptors.Kafka;import >>>>>>>>>> org.apache.flink.table.descriptors.Schema;public class Test { >>>>>>>>>> >>>>>>>>>> public static void main(String... args) throws Exception { >>>>>>>>>> EnvironmentSettings settings = >>>>>>>>>> EnvironmentSettings.newInstance() >>>>>>>>>> .useBlinkPlanner() >>>>>>>>>> .inStreamingMode() >>>>>>>>>> .build(); StreamExecutionEnvironment >>>>>>>>>> streamExecutionEnvironment = >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>> streamExecutionEnvironment.setStateBackend((StateBackend) new >>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb")); >>>>>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>>>>>>> tableEnvironment >>>>>>>>>> .connect( >>>>>>>>>> new Kafka() >>>>>>>>>> .version("0.11") >>>>>>>>>> .topic("test-topic1") >>>>>>>>>> ) >>>>>>>>>> .withFormat(new Csv()) >>>>>>>>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>>>>>>>> .inAppendMode() >>>>>>>>>> .createTemporaryTable("kafka_source"); Table >>>>>>>>>> resultTable = tableEnvironment.sqlQuery("select * from >>>>>>>>>> kafka_source"); tableEnvironment >>>>>>>>>> .connect( >>>>>>>>>> new Kafka() >>>>>>>>>> .version("0.11") >>>>>>>>>> .topic("test-topic2") >>>>>>>>>> ) >>>>>>>>>> .withFormat(new Csv()) >>>>>>>>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>>>>>>>> .inAppendMode() >>>>>>>>>> .createTemporaryTable("kafka_target"); >>>>>>>>>> tableEnvironment.insertInto("kafka_target", resultTable); >>>>>>>>>> tableEnvironment.execute("Sample Job"); } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Benchao, >>>>>>>>>>> >>>>>>>>>>> Agreed a ConsoleSink is very useful but that is not the only >>>>>>>>>>> problem here. Documentation says use tableEnv.registerTableSink >>>>>>>>>>> all over >>>>>>>>>>> the place >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink >>>>>>>>>>> however that function is deprecated. so how do I add any other Sink? >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi kant, >>>>>>>>>>>> >>>>>>>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, >>>>>>>>>>>> you can implement one custom sink following this doc[1]. >>>>>>>>>>>> >>>>>>>>>>>> IMO, an out-of-box print table sink is very useful, and I've >>>>>>>>>>>> created an issue[2] to track this. >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink >>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354 >>>>>>>>>>>> >>>>>>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the pointer. Looks like the documentation says to >>>>>>>>>>>>> use tableEnv.registerTableSink however in my IDE it shows the >>>>>>>>>>>>> method is >>>>>>>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a >>>>>>>>>>>>> sink that >>>>>>>>>>>>> can print to stdout? what sink should I use to print to stdout >>>>>>>>>>>>> and how do I >>>>>>>>>>>>> add it without converting into DataStream? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks! >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski < >>>>>>>>>>>>> pi...@ververica.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked >>>>>>>>>>>>>> @Internal. It’s not part of any public API. >>>>>>>>>>>>>> >>>>>>>>>>>>>> You don’t have to convert DataStream into Table to read from >>>>>>>>>>>>>> Kafka in Table API. I guess you could, if you had used >>>>>>>>>>>>>> DataStream API’s >>>>>>>>>>>>>> FlinkKafkaConsumer as it’s documented here [1]. >>>>>>>>>>>>>> >>>>>>>>>>>>>> But you should be able to use Kafka Table connector directly, >>>>>>>>>>>>>> as it is described in the docs [2][3]. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] >>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >>>>>>>>>>>>>> [2] >>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >>>>>>>>>>>>>> [3] >>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also why do I need to convert to DataStream to print the rows >>>>>>>>>>>>>> of a table? Why not have a print method in the Table itself? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali < >>>>>>>>>>>>>> kanth...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Do I need to use DataStream API or Table API to construct >>>>>>>>>>>>>>> sources? I am just trying to read from Kafka and print it to >>>>>>>>>>>>>>> console. And >>>>>>>>>>>>>>> yes I tried it with datastreams and it works fine but I want to >>>>>>>>>>>>>>> do it using >>>>>>>>>>>>>>> Table related APIs. I don't see any documentation or a sample >>>>>>>>>>>>>>> on how to >>>>>>>>>>>>>>> create Kafka table source or any other source using Table >>>>>>>>>>>>>>> Source API's so >>>>>>>>>>>>>>> after some digging I wrote the following code. My ultimate goal >>>>>>>>>>>>>>> is to avoid >>>>>>>>>>>>>>> Datastream API as much as possible and just use Table API & SQL >>>>>>>>>>>>>>> but somehow >>>>>>>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL >>>>>>>>>>>>>>> interface. am >>>>>>>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense >>>>>>>>>>>>>>> to focus on >>>>>>>>>>>>>>> SQL interfaces for both streaming and batch? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> import >>>>>>>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;import >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSource;import >>>>>>>>>>>>>>> org.apache.flink.table.api.DataTypes;import >>>>>>>>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import >>>>>>>>>>>>>>> org.apache.flink.table.api.Table;import >>>>>>>>>>>>>>> org.apache.flink.table.api.TableSchema;import >>>>>>>>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import >>>>>>>>>>>>>>> org.apache.flink.table.sources.TableSource;import >>>>>>>>>>>>>>> org.apache.flink.types.Row;import java.io.IOException;import >>>>>>>>>>>>>>> java.util.Properties;public class Test { >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public class MyDeserializationSchema extends >>>>>>>>>>>>>>> AbstractDeserializationSchema<Row> { >>>>>>>>>>>>>>> @Override public Row deserialize(byte[] message) >>>>>>>>>>>>>>> throws IOException { >>>>>>>>>>>>>>> return Row.of(new String(message)); } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public static void main(String... args) throws Exception { >>>>>>>>>>>>>>> Test test = new Test(); EnvironmentSettings >>>>>>>>>>>>>>> settings = EnvironmentSettings.newInstance() >>>>>>>>>>>>>>> .useBlinkPlanner() >>>>>>>>>>>>>>> .inStreamingMode() >>>>>>>>>>>>>>> .build(); StreamExecutionEnvironment >>>>>>>>>>>>>>> streamExecutionEnvironment = >>>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, >>>>>>>>>>>>>>> settings); TableSource tableSource = >>>>>>>>>>>>>>> test.getKafkaTableSource(); Table kafkaTable = >>>>>>>>>>>>>>> tableEnvironment.fromTableSource(tableSource); >>>>>>>>>>>>>>> tableEnvironment.createTemporaryView("kafka_source", >>>>>>>>>>>>>>> kafkaTable); Table resultTable = >>>>>>>>>>>>>>> tableEnvironment.sqlQuery("select * from kafka_source"); >>>>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, >>>>>>>>>>>>>>> Row.class).print(); >>>>>>>>>>>>>>> streamExecutionEnvironment.execute("Sample Job"); } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public KafkaTableSource getKafkaTableSource() { >>>>>>>>>>>>>>> TableSchema tableSchema = >>>>>>>>>>>>>>> TableSchema.builder().field("f0", DataTypes.STRING()).build(); >>>>>>>>>>>>>>> Properties properties = new Properties(); >>>>>>>>>>>>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>>>>>>>>>>>> properties.setProperty("group.id", "test"); return >>>>>>>>>>>>>>> new KafkaTableSource(tableSchema, "test-topic1", properties, >>>>>>>>>>>>>>> new MyDeserializationSchema()); } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I get the following error >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The program finished with the following exception: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not >>>>>>>>>>>>>>> serializable. The object probably contains or references non >>>>>>>>>>>>>>> serializable >>>>>>>>>>>>>>> fields. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>>>>>>>>>>>>> Test.main(Test.java:40) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The error seems to be on the line >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> and I am not sure why it is not able to serialize? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> Benchao Li >>>>>>>>>>>> School of Electronics Engineering and Computer Science, Peking >>>>>>>>>>>> UniversityTel:+86-15650713730 >>>>>>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Benchao Li >>>>>>>> School of Electronics Engineering and Computer Science, Peking >>>>>>>> UniversityTel:+86-15650713730 >>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>>>>>> >>>>>>>> >>>> >>>> -- >>>> >>>> Benchao Li >>>> School of Electronics Engineering and Computer Science, Peking >>>> UniversityTel:+86-15650713730 >>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>> >>>>