Given the popularity of Flink SQL and Kafka as streaming source, I think we can add some examples of using Kafka[XXX]TableSource in flink-examples/flink-examples-table module. What do you guys think?
Cheers Shuyi On Mon, Jun 4, 2018 at 12:57 AM, Timo Walther <twal...@apache.org> wrote: > Hi, > > as you can see in code [1] Kafka09JsonTableSource takes a TableSchema. You > can create table schema from type information see [2]. > > Regards, > Timo > > [1] https://github.com/apache/flink/blob/master/flink-connectors > /flink-connector-kafka-0.9/src/main/java/org/apache/ > flink/streaming/connectors/kafka/Kafka09JsonTableSource.java > [2] https://github.com/apache/flink/blob/master/flink-libraries/ > flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala > > Am 02.06.18 um 18:31 schrieb Radhya Sahal: > > Thanks Rong, >> >> I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema? >> >> Yes, there are two names but now I put one name only and I want to define >> jsonschema. >> >> >> >> Rong Rong wrote >> >>> Hi Radhya, >>> >>> Can you provide which Flink version you are using? Based on the latest >>> FLINK 1.5 release, Kafka09JsonTableSource takes: >>> >>> /** >>> * Creates a Kafka 0.9 JSON {@link StreamTableSource}. >>> * >>> * @param topic Kafka topic to consume. >>> * @param properties Properties for the Kafka consumer. >>> * @param tableSchema The schema of the table. >>> * @param jsonSchema The schema of the JSON messages to decode from >>> Kafka. >>> */ >>> >>> Also, your type definition: TypeInformation >>> <Row> >>> typeInfo2 = Types.ROW(... >>> arguments seem to have different length for schema names and types. >>> >>> Thanks, >>> Rong >>> >>> On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal < >>> radhya.sahal@ >>> > wrote: >>> >>> Hi, >>>> >>>> Could anyone help me to solve this problem >>>> >>>> >>>> /Exception in thread "main" java.lang.Error: Unresolved compilation >>>> problem: >>>> The constructor Kafka09JsonTableSource(String, Properties, >>>> TypeInformation >>>> >>> <Row> >>> ) is undefined >>> >>>> / >>>> *--This is the code * >>>> public class FlinkKafkaSQL { >>>> public static void main(String[] args) throws Exception { >>>> // Read parameters from command line >>>> final ParameterTool params = ParameterTool.fromArgs(args); >>>> >>>> if(params.getNumberOfParameters() < 5) { >>>> System.out.println("\nUsage: FlinkReadKafka " + >>>> "--read-topic >>>> >>> <topic> >>> " + >>> >>>> "--write-topic >>>> >>> <topic> >>> " + >>> >>>> "--bootstrap.servers >>>> >>> <kafka brokers> >>> " + >>> >>>> "zookeeper.connect" + >>>> "--group.id >>>> >>> <groupid> >>> "); >>> >>>> return; >>>> } >>>> >>>> // setup streaming environment >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> env.getConfig().setRestartStrategy(RestartStrategies.fixedDe >>>> layRestart(4, >>>> 10000)); >>>> env.enableCheckpointing(300000); // 300 seconds >>>> env.getConfig().setGlobalJobParameters(params); >>>> >>>> StreamTableEnvironment tableEnv = >>>> TableEnvironment.getTableEnvironment(env); >>>> >>>> // specify JSON field names and types >>>> >>>> >>>> TypeInformation >>>> >>> <Row> >>> typeInfo2 = Types.ROW( >>> >>>> new String[] { "iotdevice", "sensorID" }, >>>> new TypeInformation<?>[] { Types.STRING()} >>>> ); >>>> >>>> // create a new tablesource of JSON from kafka >>>> KafkaJsonTableSource kafkaTableSource = new >>>> Kafka09JsonTableSource( >>>> params.getRequired("read-topic"), >>>> params.getProperties(), >>>> typeInfo2); >>>> >>>> // run some SQL to filter results where a key is not null >>>> String sql = "SELECT sensorID " + >>>> "FROM iotdevice "; >>>> tableEnv.registerTableSource("iotdevice", >>>> kafkaTableSource); >>>> Table result = tableEnv.sql(sql); >>>> >>>> // create a partition for the data going into kafka >>>> FlinkFixedPartitioner partition = new >>>> FlinkFixedPartitioner(); >>>> >>>> // create new tablesink of JSON to kafka >>>> KafkaJsonTableSink kafkaTableSink = new >>>> Kafka09JsonTableSink( >>>> params.getRequired("write-topic"), >>>> params.getProperties(), >>>> partition); >>>> >>>> result.writeToSink(kafkaTableSink); >>>> >>>> env.execute("FlinkReadWriteKafkaJSON"); >>>> } >>>> } >>>> >>>> >>>> *This is the dependencies in pom.xml* >>>> >>>> >>>> >>> <dependencies> >>> >>>> >>>> >>> <dependency> >>> >>>> >>>> >>> <groupId> >>> org.apache.flink >>> </groupId> >>> >>>> >>>> >>> <artifactId> >>> flink-java >>> </artifactId> >>> >>>> >>>> >>> <version> >>> 1.3.0 >>> </version> >>> >>>> >>>> >>> </dependency> >>> >>>> >>>> >>> <dependency> >>> >>>> >>>> >>> <groupId> >>> org.apache.flink >>> </groupId> >>> >>>> >>>> >>> <artifactId> >>> flink-streaming-java_2.11 >>> </artifactId> >>> >>>> >>>> >>> <version> >>> 1.3.0 >>> </version> >>> >>>> >>>> >>> </dependency> >>> >>>> >>>> >>> <dependency> >>> >>>> >>>> >>> <groupId> >>> org.apache.flink >>> </groupId> >>> >>>> >>>> >>> <artifactId> >>> flink-clients_2.11 >>> </artifactId> >>> >>>> >>>> >>> <version> >>> 1.3.0 >>> </version> >>> >>>> >>>> >>> </dependency> >>> >>>> >>>> >>> <dependency> >>> >>>> >>>> >>> <groupId> >>> org.apache.flink >>> </groupId> >>> >>>> >>>> >>> <artifactId> >>> flink-connector-kafka-0.9 >>> </artifactId> >>> >>>> >>>> <version> >>> 1.3.0 >>> </version> >>> >>>> >>>> >>> </dependency> >>> >>>> >>>> >>> <dependency> >>> >>>> >>>> >>> <groupId> >>> org.apache.flink >>> </groupId> >>> >>>> >>>> >>> <artifactId> >>> flink-table_2.11 >>> </artifactId> >>> >>>> >>>> >>> <version> >>> 1.3.0 >>> </version> >>> >>>> >>>> >>> </dependency> >>> >>>> >>>> >>> <dependency> >>> >>>> >>>> >>> <groupId> >>> org.apache.flink >>> </groupId> >>> >>>> >>>> >>> <artifactId> >>> flink-core >>> </artifactId> >>> >>>> >>>> >>> <version> >>> 1.3.0 >>> </version> >>> >>>> >>>> >>> </dependency> >>> >>>> >>>> >>> <dependency> >>> >>>> >>>> >>> <groupId> >>> org.apache.flink >>> </groupId> >>> >>>> >>>> >>> <artifactId> >>> flink-streaming- >>> >>>> scala_2.11 >>>> >>> </artifactId> >>> >>>> >>>> >>> <version> >>> 1.3.0 >>> </version> >>> >>>> >>>> >>> </dependency> >>> >>>> >>>> >>> </dependencies> >>> >>>> >>>> Regards. >>>> >>>> >>>> >>>> -- >>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050. >>>> n4.nabble.com/ >>>> >>>> >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> > > > -- "So you have to trust that the dots will somehow connect in your future."