Hi, I've pulled in Timo, who can help you with your problem.
Cheers, Till On Tue, Sep 25, 2018 at 12:02 PM clay4444 <clay4me...@gmail.com> wrote: > hi: > I am using flink's table api, I receive data from kafka, then register it > as > a table, then I use sql statement to process, and finally convert the > result > back to a stream, write to a directory, the code looks like this: > > def main(args: Array[String]): Unit = { > > val sEnv = StreamExecutionEnvironment.getExecutionEnvironment > sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > val tEnv = TableEnvironment.getTableEnvironment(sEnv) > > tEnv.connect( > new Kafka() > .version("0.11") > .topic("user") > .startFromEarliest() > .property("zookeeper.connect", "") > .property("bootstrap.servers", "") > ) > .withFormat( > new Json() > .failOnMissingField(false) > .deriveSchema() //使用表的 schema > ) > .withSchema( > new Schema() > .field("username_skey", Types.STRING) > ) > .inAppendMode() > .registerTableSource("user") > val userTest: Table = tEnv.sqlQuery( > """ > select ** form ** join **"".stripMargin) > val endStream = tEnv.toRetractStream[Row](userTest) > endStream.writeAsText("/tmp/sqlres",WriteMode.OVERWRITE) > sEnv.execute("Test_New_Sign_Student") > } > > I was successful in the local test, but when I submit the following command > in the cluster, I get the following error: > > ======================================================= > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120) > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could > not find a suitable table factory for > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > the classpath. > > Reason: No factory implements > 'org.apache.flink.table.factories.DeserializationSchemaFactory'. > > The following properties are requested: > connector.properties.0.key=zookeeper.connect > .... > schema.9.name=roles > schema.9.type=VARCHAR > update-mode=append > > The following factories have been considered: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.table.sinks.CsvBatchTableSinkFactory > org.apache.flink.table.sinks.CsvAppendTableSinkFactory > org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory > > > at > > org.apache.flink.table.factories.TableFactoryService$.filterByFactoryClass(TableFactoryService.scala:176) > at > > org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:125) > at > > org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100) > at > > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala) > at > > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259) > at > > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144) > at > > org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:50) > at > > org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:44) > at > org.clay.test.Test_New_Sign_Student$.main(Test_New_Sign_Student.scala:64) > at > org.clay.test.Test_New_Sign_Student.main(Test_New_Sign_Student.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > > > =================================== > > Can someone tell me what caused this? I am very confused about > this........ > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >