Hey, Could You please show a sample data that You want to process? This would help in verifying the issue.
Best Regards, Dom. wt., 13 lis 2018 o 13:58 Jeff Zhang <zjf...@gmail.com> napisał(a): > Hi, > > I hit the following error when I try to use kafka connector in flink table > api. There's very little document about how to use kafka connector in flink > table api, could anyone help me on that ? Thanks > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field 'event_ts' could not be resolved by the field mapping. > at > org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521) > at > org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127) > at > org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33) > at > org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150) > at > org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541) > at > org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47) > at > org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68) > > And here's the source code: > > > > case class Record(status: String, direction: String, var event_ts: Timestamp) > > > def main(args: Array[String]): Unit = { > val senv = StreamExecutionEnvironment.getExecutionEnvironment > senv.setParallelism(1) > senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > val data: DataStream[Record] = ... > val tEnv = TableEnvironment.getTableEnvironment(senv) > tEnv > // declare the external system to connect to > .connect( > new Kafka() > .version("0.11") > .topic("processed5.events") > .startFromEarliest() > .property("zookeeper.connect", "localhost:2181") > .property("bootstrap.servers", "localhost:9092")) > .withFormat(new Json() > .failOnMissingField(false) > .deriveSchema() > ) > .withSchema( > new Schema() > .field("status", Types.STRING) > .field("direction", Types.STRING) > .field("event_ts", Types.SQL_TIMESTAMP).rowtime( > new > Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending()) > ) > > // specify the update-mode for streaming tables > .inAppendMode() > > // register as source, sink, or both and under a name > .registerTableSourceAndSink("MyUserTable"); > > tEnv.fromDataStream(data).insertInto("MyUserTable") > >