* What went wrong: Could not determine the dependencies of task ':shadowJar'. > Could not resolve all dependencies for configuration ':flinkShadowJar'. > Could not find org.apache.flink:flink-sql-connector-kafka_2.11:universal. Searched in the following locations: - https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom - https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar - https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom - https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar Required by: project :
On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Kant, > > If you want to use the *universal *kafka connector you use "universal" > for the version. The community decided to no longer distinguish different > kafka connector versions, but to use the newest kafka client version for > all versions of kafka 1.0+. So if you want to use the connector from > flink-sql-connector-kafka_2.11 use "universal" for the version. > > As for the collect/print sink. We do realize importance of the sink and > there were a few approaches to implement one. Including the TableUtils > mentioned by godfrey. It does not have strong consistency guarantees and is > recommended rather only for experiments/testing. There is also an ongoing > discussion how to implement such a sink for *both *batch and streaming > here: > https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17046455#comment-17046455 > > Best, > > Dawid > On 01/03/2020 12:00, kant kodali wrote: > > Hi Benchao, > > That worked! Pasting the build.gradle file here. However this only works > for 0.11 and it needs zookeeper.connect() which shouldn't be required. not > sure why it is required in Flink Kafka connector? If I change the version > to 2.2 in the code and specify this jar > > flinkShadowJar > "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" > > or > > flinkShadowJar > "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" //Not > sure if I should use this one for Kafka >= 0.11 > > It doesn't work either. > > > buildscript { repositories { jcenter() // this applies only to the > Gradle 'Shadow' plugin } dependencies { classpath > 'com.github.jengelman.gradle.plugins:shadow:2.0.4' }}plugins { id > 'java' id 'application'}mainClassName = 'Test'apply plugin: > 'com.github.johnrengelman.shadow'ext { javaVersion = '1.8' flinkVersion > = '1.10.0' scalaBinaryVersion = '2.11' slf4jVersion = '1.7.7' > log4jVersion = '1.2.17'}sourceCompatibility = javaVersiontargetCompatibility > = javaVersiontasks.withType(JavaCompile) { options.encoding = > 'UTF-8'}applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"] > // declare where to find the dependencies of your projectrepositories { > mavenCentral() > maven { url > "https://repository.apache.org/content/repositories/snapshots/" }}// NOTE: We > cannot use "compileOnly" or "shadow" configurations since then we could not > run code// in the IDE or with "gradle run". We also cannot exclude transitive > dependencies from the// shadowJar yet (see > https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly define > the // libraries we want to be included in the "flinkShadowJar" > configuration!configurations { flinkShadowJar // dependencies which go > into the shadowJar // always exclude these (also from transitive > dependencies) since they are provided by Flink flinkShadowJar.exclude > group: 'org.apache.flink', module: 'force-shading' flinkShadowJar.exclude > group: 'com.google.code.findbugs', module: 'jsr305' flinkShadowJar.exclude > group: 'org.slf4j' flinkShadowJar.exclude group: 'log4j'}// declare the > dependencies for your production and test codedependencies { // > -------------------------------------------------------------- // > Compile-time dependencies that should NOT be part of the // shadow jar and > are provided in the lib folder of Flink // > -------------------------------------------------------------- compile > "org.apache.flink:flink-java:${flinkVersion}" compile > "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" > // -------------------------------------------------------------- // > Dependencies that should be part of the shadow jar, e.g. // connectors. > These must be in the flinkShadowJar configuration! // > -------------------------------------------------------------- compile > "org.apache.flink:flink-java:${flinkVersion}" compile > "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" > flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}" // tried > doesnt work. same problem //flinkShadowJar > "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" // tried > doesnt work. same problem //flinkShadowJar > "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" //tried > doesnt work. same problem flinkShadowJar > "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" > flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" > compileOnly > "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" > compileOnly "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" > flinkShadowJar > "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" > flinkShadowJar > "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" compile > "log4j:log4j:${log4jVersion}" compile > "org.slf4j:slf4j-log4j12:${slf4jVersion}" // Add test dependencies here. > // testCompile "junit:junit:4.12" testImplementation > "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" > testImplementation > "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// make > compileOnly dependencies available for tests:sourceSets { > main.compileClasspath += configurations.flinkShadowJar > main.runtimeClasspath += configurations.flinkShadowJar > > test.compileClasspath += configurations.flinkShadowJar > test.runtimeClasspath += configurations.flinkShadowJar > > javadoc.classpath += configurations.flinkShadowJar}run.classpath = > sourceSets.main.runtimeClasspathjar { manifest { attributes > 'Built-By': System.getProperty('user.name'), > 'Build-Jdk': System.getProperty('java.version') > }}shadowJar { configurations = [project.configurations.flinkShadowJar] > mergeServiceFiles() > manifest { attributes 'Main-Class': mainClassName }} > > > > > > > On Sun, Mar 1, 2020 at 1:38 AM Benchao Li <libenc...@gmail.com> wrote: > >> I don't know how gradle works, but in Maven, packaging dependencies into >> one fat jar needs to specify how SPI property files should be dealt with, >> like >> >> <transformers> <transformer >> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers> >> >> Could you check that your final jar contains correct resource file? >> >> godfrey he <godfre...@gmail.com> 于2020年3月1日周日 下午5:25写道: >> >>> I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead >>> of `flink-connector-kafka_2.11`. >>> >>> Bests, >>> Godfrey >>> >>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午5:15写道: >>> >>>> The dependency was already there. Below is my build.gradle. Also I >>>> checked the kafka version and looks like the jar >>>> >>>> flinkShadowJar >>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" >>>> >>>> downloads kafka-clients version 2.2.0. So I changed my code to version >>>> 2.2.0 and same problem persists. >>>> >>>> buildscript { repositories { jcenter() // this applies only to >>>> the Gradle 'Shadow' plugin } dependencies { classpath >>>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4' }}plugins { id >>>> 'java' id 'application'}mainClassName = 'Test'apply plugin: >>>> 'com.github.johnrengelman.shadow'ext { javaVersion = '1.8' >>>> flinkVersion = '1.10.0' scalaBinaryVersion = '2.11' slf4jVersion = >>>> '1.7.7' log4jVersion = '1.2.17'}sourceCompatibility = >>>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) { >>>> options.encoding = 'UTF-8'}applicationDefaultJvmArgs = >>>> ["-Dlog4j.configuration=log4j.properties"] >>>> // declare where to find the dependencies of your projectrepositories { >>>> mavenCentral() >>>> maven { url >>>> "https://repository.apache.org/content/repositories/snapshots/" }}// NOTE: >>>> We cannot use "compileOnly" or "shadow" configurations since then we could >>>> not run code// in the IDE or with "gradle run". We also cannot exclude >>>> transitive dependencies from the// shadowJar yet (see >>>> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly >>>> define the // libraries we want to be included in the "flinkShadowJar" >>>> configuration!configurations { flinkShadowJar // dependencies which go >>>> into the shadowJar // always exclude these (also from transitive >>>> dependencies) since they are provided by Flink flinkShadowJar.exclude >>>> group: 'org.apache.flink', module: 'force-shading' >>>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305' >>>> flinkShadowJar.exclude group: 'org.slf4j' flinkShadowJar.exclude >>>> group: 'log4j'}// declare the dependencies for your production and test >>>> codedependencies { // >>>> -------------------------------------------------------------- // >>>> Compile-time dependencies that should NOT be part of the // shadow jar >>>> and are provided in the lib folder of Flink // >>>> -------------------------------------------------------------- compile >>>> "org.apache.flink:flink-java:${flinkVersion}" compile >>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >>>> // -------------------------------------------------------------- >>>> // Dependencies that should be part of the shadow jar, e.g. // >>>> connectors. These must be in the flinkShadowJar configuration! // >>>> -------------------------------------------------------------- compile >>>> "org.apache.flink:flink-java:${flinkVersion}" compile >>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" >>>> flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}" >>>> flinkShadowJar >>>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}" >>>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}" >>>> compileOnly >>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >>>> compileOnly >>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" >>>> flinkShadowJar >>>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}" >>>> flinkShadowJar >>>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}" >>>> compile "log4j:log4j:${log4jVersion}" compile >>>> "org.slf4j:slf4j-log4j12:${slf4jVersion}" // Add test dependencies >>>> here. // testCompile "junit:junit:4.12" testImplementation >>>> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}" >>>> testImplementation >>>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"}// make >>>> compileOnly dependencies available for tests:sourceSets { >>>> main.compileClasspath += configurations.flinkShadowJar >>>> main.runtimeClasspath += configurations.flinkShadowJar >>>> >>>> test.compileClasspath += configurations.flinkShadowJar >>>> test.runtimeClasspath += configurations.flinkShadowJar >>>> >>>> javadoc.classpath += configurations.flinkShadowJar}run.classpath = >>>> sourceSets.main.runtimeClasspathjar { manifest { attributes >>>> 'Built-By': System.getProperty('user.name'), >>>> 'Build-Jdk': System.getProperty('java.version') >>>> }}shadowJar { configurations = >>>> [project.configurations.flinkShadowJar]} >>>> >>>> >>>> >>>> >>>> On Sun, Mar 1, 2020 at 12:50 AM godfrey he <godfre...@gmail.com> wrote: >>>> >>>>> hi kant, >>>>> >>>>> > Also why do I need to convert to DataStream to print the rows of a >>>>> table? Why not have a print method in the Table itself? >>>>> Flink 1.10 introduces a utility class named TableUtils to convert a >>>>> Table to List<Row>, this utility class is mainly used for demonstration or >>>>> testing and is only applicable for *small batch jobs* and small >>>>> finite *append only stream jobs*. code like: >>>>> Table table = tEnv.sqlQuery("select ..."); >>>>> List<Row> result = TableUtils.collectToList(table); >>>>> result..... >>>>> >>>>> currently, we are planner to implement Table#collect[1], after >>>>> that Table#head and Table#print may be also introduced soon. >>>>> >>>>> > The program finished with the following exception: >>>>> please make sure that the kafka version in Test class and the kafka >>>>> version in pom dependency are same. I tested your code successfully. >>>>> >>>>> Bests, >>>>> Godfrey >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-14807 >>>>> >>>>> >>>>> Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道: >>>>> >>>>>> Hi kant, >>>>>> >>>>>> CSV format is an independent module, you need to add it as your >>>>>> dependency. >>>>>> >>>>>> <dependency> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-csv</artifactId> >>>>>> <version>${flink.version}</version></dependency> >>>>>> >>>>>> >>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道: >>>>>> >>>>>>> ------------------------------------------------------------ >>>>>>> The program finished with the following exception: >>>>>>> >>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>>> method caused an error: findAndCreateTableSource failed. >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>>>>> at >>>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>>>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>>>>>> at >>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>>>>>> at >>>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>>>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >>>>>>> Caused by: org.apache.flink.table.api.TableException: >>>>>>> findAndCreateTableSource failed. >>>>>>> at >>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >>>>>>> at >>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) >>>>>>> at >>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) >>>>>>> at >>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) >>>>>>> at >>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) >>>>>>> at >>>>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) >>>>>>> at >>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >>>>>>> at >>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >>>>>>> at >>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >>>>>>> at >>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >>>>>>> at >>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >>>>>>> at >>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >>>>>>> at >>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >>>>>>> at >>>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >>>>>>> at >>>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >>>>>>> at >>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >>>>>>> at >>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >>>>>>> at >>>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >>>>>>> at >>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >>>>>>> at >>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) >>>>>>> at Test.main(Test.java:34) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>>>>> ... 8 more >>>>>>> Caused by: >>>>>>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not >>>>>>> find >>>>>>> a suitable table factory for >>>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>>>>> the classpath. >>>>>>> >>>>>>> Reason: Required context properties mismatch. >>>>>>> >>>>>>> The matching candidates: >>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>>>> Mismatched properties: >>>>>>> 'connector.type' expects 'filesystem', but is 'kafka' >>>>>>> >>>>>>> The following properties are requested: >>>>>>> connector.property-version=1 >>>>>>> connector.topic=test-topic1 >>>>>>> connector.type=kafka >>>>>>> connector.version=0.11 >>>>>>> format.property-version=1 >>>>>>> format.type=csv >>>>>>> schema.0.data-type=VARCHAR(2147483647) >>>>>>> schema.0.name=f0 >>>>>>> update-mode=append >>>>>>> >>>>>>> The following factories have been considered: >>>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>>>> at >>>>>>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >>>>>>> at >>>>>>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >>>>>>> at >>>>>>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>>>>>> at >>>>>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>>>>>> at >>>>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >>>>>>> ... 34 more >>>>>>> >>>>>>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> > Thanks for the pointer. Looks like the documentation says to use >>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>>>>>> deprecated in Flink 1.10. >>>>>>>> >>>>>>>> It looks like not all of the documentation was updated after >>>>>>>> methods were deprecated. However if you look at the java docs of the >>>>>>>> `registerTableSink` method, you can find an answer [1]. >>>>>>>> >>>>>>>> > It sill doesnt work because it says for CSV the connector.type >>>>>>>> should be filesystem not Kafka. >>>>>>>> >>>>>>>> Can you post the full stack trace? As I’m not familiar with the >>>>>>>> Table API, maybe you Timo or Dawid know what’s going on here? >>>>>>>> >>>>>>>> Piotrek >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink- >>>>>>>> >>>>>>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote: >>>>>>>> >>>>>>>> Here is my updated code after digging through the source code (not >>>>>>>> sure if it is correct ). It sill doesnt work because it says for CSV >>>>>>>> the >>>>>>>> connector.type should be filesystem not Kafka but documentation says >>>>>>>> it is >>>>>>>> supported. >>>>>>>> >>>>>>>> >>>>>>>> import >>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import >>>>>>>> org.apache.flink.runtime.state.StateBackend;import >>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import >>>>>>>> org.apache.flink.table.api.DataTypes;import >>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import >>>>>>>> org.apache.flink.table.api.Table;import >>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import >>>>>>>> org.apache.flink.table.descriptors.Csv;import >>>>>>>> org.apache.flink.table.descriptors.Kafka;import >>>>>>>> org.apache.flink.table.descriptors.Schema;public class Test { >>>>>>>> >>>>>>>> public static void main(String... args) throws Exception { >>>>>>>> EnvironmentSettings settings = >>>>>>>> EnvironmentSettings.newInstance() >>>>>>>> .useBlinkPlanner() >>>>>>>> .inStreamingMode() >>>>>>>> .build(); StreamExecutionEnvironment >>>>>>>> streamExecutionEnvironment = >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> streamExecutionEnvironment.setStateBackend((StateBackend) new >>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb")); >>>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings); >>>>>>>> tableEnvironment >>>>>>>> .connect( >>>>>>>> new Kafka() >>>>>>>> .version("0.11") >>>>>>>> .topic("test-topic1") >>>>>>>> ) >>>>>>>> .withFormat(new Csv()) >>>>>>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>>>>>> .inAppendMode() >>>>>>>> .createTemporaryTable("kafka_source"); Table >>>>>>>> resultTable = tableEnvironment.sqlQuery("select * from kafka_source"); >>>>>>>> tableEnvironment >>>>>>>> .connect( >>>>>>>> new Kafka() >>>>>>>> .version("0.11") >>>>>>>> .topic("test-topic2") >>>>>>>> ) >>>>>>>> .withFormat(new Csv()) >>>>>>>> .withSchema(new Schema().field("f0", DataTypes.STRING())) >>>>>>>> .inAppendMode() >>>>>>>> .createTemporaryTable("kafka_target"); >>>>>>>> tableEnvironment.insertInto("kafka_target", resultTable); >>>>>>>> tableEnvironment.execute("Sample Job"); } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Benchao, >>>>>>>>> >>>>>>>>> Agreed a ConsoleSink is very useful but that is not the only >>>>>>>>> problem here. Documentation says use tableEnv.registerTableSink all >>>>>>>>> over >>>>>>>>> the place >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink >>>>>>>>> however that function is deprecated. so how do I add any other Sink? >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi kant, >>>>>>>>>> >>>>>>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you >>>>>>>>>> can implement one custom sink following this doc[1]. >>>>>>>>>> >>>>>>>>>> IMO, an out-of-box print table sink is very useful, and I've >>>>>>>>>> created an issue[2] to track this. >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink >>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354 >>>>>>>>>> >>>>>>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Thanks for the pointer. Looks like the documentation says to use >>>>>>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is >>>>>>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a >>>>>>>>>>> sink that >>>>>>>>>>> can print to stdout? what sink should I use to print to stdout and >>>>>>>>>>> how do I >>>>>>>>>>> add it without converting into DataStream? >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski < >>>>>>>>>>> pi...@ververica.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked >>>>>>>>>>>> @Internal. It’s not part of any public API. >>>>>>>>>>>> >>>>>>>>>>>> You don’t have to convert DataStream into Table to read from >>>>>>>>>>>> Kafka in Table API. I guess you could, if you had used DataStream >>>>>>>>>>>> API’s >>>>>>>>>>>> FlinkKafkaConsumer as it’s documented here [1]. >>>>>>>>>>>> >>>>>>>>>>>> But you should be able to use Kafka Table connector directly, >>>>>>>>>>>> as it is described in the docs [2][3]. >>>>>>>>>>>> >>>>>>>>>>>> Piotrek >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html >>>>>>>>>>>> [2] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview >>>>>>>>>>>> [3] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >>>>>>>>>>>> >>>>>>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Also why do I need to convert to DataStream to print the rows >>>>>>>>>>>> of a table? Why not have a print method in the Table itself? >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi All, >>>>>>>>>>>>> >>>>>>>>>>>>> Do I need to use DataStream API or Table API to construct >>>>>>>>>>>>> sources? I am just trying to read from Kafka and print it to >>>>>>>>>>>>> console. And >>>>>>>>>>>>> yes I tried it with datastreams and it works fine but I want to >>>>>>>>>>>>> do it using >>>>>>>>>>>>> Table related APIs. I don't see any documentation or a sample on >>>>>>>>>>>>> how to >>>>>>>>>>>>> create Kafka table source or any other source using Table Source >>>>>>>>>>>>> API's so >>>>>>>>>>>>> after some digging I wrote the following code. My ultimate goal >>>>>>>>>>>>> is to avoid >>>>>>>>>>>>> Datastream API as much as possible and just use Table API & SQL >>>>>>>>>>>>> but somehow >>>>>>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL >>>>>>>>>>>>> interface. am >>>>>>>>>>>>> I wrong? From the user perspective wouldn't it make more sense to >>>>>>>>>>>>> focus on >>>>>>>>>>>>> SQL interfaces for both streaming and batch? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> import >>>>>>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;import >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSource;import >>>>>>>>>>>>> org.apache.flink.table.api.DataTypes;import >>>>>>>>>>>>> org.apache.flink.table.api.EnvironmentSettings;import >>>>>>>>>>>>> org.apache.flink.table.api.Table;import >>>>>>>>>>>>> org.apache.flink.table.api.TableSchema;import >>>>>>>>>>>>> org.apache.flink.table.api.java.StreamTableEnvironment;import >>>>>>>>>>>>> org.apache.flink.table.sources.TableSource;import >>>>>>>>>>>>> org.apache.flink.types.Row;import java.io.IOException;import >>>>>>>>>>>>> java.util.Properties;public class Test { >>>>>>>>>>>>> >>>>>>>>>>>>> public class MyDeserializationSchema extends >>>>>>>>>>>>> AbstractDeserializationSchema<Row> { >>>>>>>>>>>>> @Override public Row deserialize(byte[] message) >>>>>>>>>>>>> throws IOException { >>>>>>>>>>>>> return Row.of(new String(message)); } >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> public static void main(String... args) throws Exception { >>>>>>>>>>>>> Test test = new Test(); EnvironmentSettings >>>>>>>>>>>>> settings = EnvironmentSettings.newInstance() >>>>>>>>>>>>> .useBlinkPlanner() >>>>>>>>>>>>> .inStreamingMode() >>>>>>>>>>>>> .build(); StreamExecutionEnvironment >>>>>>>>>>>>> streamExecutionEnvironment = >>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>> StreamTableEnvironment tableEnvironment = >>>>>>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, >>>>>>>>>>>>> settings); TableSource tableSource = >>>>>>>>>>>>> test.getKafkaTableSource(); Table kafkaTable = >>>>>>>>>>>>> tableEnvironment.fromTableSource(tableSource); >>>>>>>>>>>>> tableEnvironment.createTemporaryView("kafka_source", kafkaTable); >>>>>>>>>>>>> Table resultTable = tableEnvironment.sqlQuery("select * >>>>>>>>>>>>> from kafka_source"); >>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>>>>>>>> streamExecutionEnvironment.execute("Sample Job"); } >>>>>>>>>>>>> >>>>>>>>>>>>> public KafkaTableSource getKafkaTableSource() { >>>>>>>>>>>>> TableSchema tableSchema = >>>>>>>>>>>>> TableSchema.builder().field("f0", DataTypes.STRING()).build(); >>>>>>>>>>>>> Properties properties = new Properties(); >>>>>>>>>>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>>>>>>>>>> properties.setProperty("group.id", "test"); return new >>>>>>>>>>>>> KafkaTableSource(tableSchema, "test-topic1", properties, new >>>>>>>>>>>>> MyDeserializationSchema()); } >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> I get the following error >>>>>>>>>>>>> >>>>>>>>>>>>> The program finished with the following exception: >>>>>>>>>>>>> >>>>>>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not >>>>>>>>>>>>> serializable. The object probably contains or references non >>>>>>>>>>>>> serializable >>>>>>>>>>>>> fields. >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>>>>>>>>>>> >>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>>>>>> >>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>>>>>>>>>> >>>>>>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>>>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>>>>>>>>>> >>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>>>>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>>>>>>>>>>> Test.main(Test.java:40) >>>>>>>>>>>>> >>>>>>>>>>>>> The error seems to be on the line >>>>>>>>>>>>> >>>>>>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print(); >>>>>>>>>>>>> >>>>>>>>>>>>> and I am not sure why it is not able to serialize? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks! >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Benchao Li >>>>>>>>>> School of Electronics Engineering and Computer Science, Peking >>>>>>>>>> UniversityTel:+86-15650713730 >>>>>>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Benchao Li >>>>>> School of Electronics Engineering and Computer Science, Peking >>>>>> UniversityTel:+86-15650713730 >>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>>>> >>>>>> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking >> UniversityTel:+86-15650713730 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn >> >>