Hi All, I am trying to select multiple results from Kafka and send results to Kafka different topic using Table API. But I am getting below error. Could you please help me on this.
Query: SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 , mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' UNION SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 , mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' UNION SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 , mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' *Error:* 2019-01-13 21:36:36,228 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Exception occurred in REST handler. org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 3 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226) ... 4 more Caused by: org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes. at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382) at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784) at org.apache.flink.table.api.Table.insertInto(table.scala:877) at org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) ... 9 more *Source Code:* *StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); tableEnv.registerFunction("mytime", new MyTime(10)); tableEnv.connect(new Kafka().version("0.10").topic("testin") .properties(kConsumer) .startFromLatest()) .withFormat(new Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema()) .withSchema(new Schema() .field("InterceptID", "DECIMAL") .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR") .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP()) .rowtime(new Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000))) .inAppendMode() .registerTableSource(sourceTable); // WindowedTable windowedTable = // tableEnv.scan(sourceTable).window(Tumble.over("50.minutes")); //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder(); for(String sql : rules) { if(multi.length() > 0) { multi.append(" UNION ").append("\n"); } multi.append( sql); } LOGGER.info("********************************* " + multi.toString()); Table result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the external system to connect to .connect(new Kafka().version("0.10") .topic("testout").startFromEarliest() .properties(kProducer) ) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(new Schema() .field("TiggerID", Types.DECIMAL()) .field("RuleName", Types.STRING()) .field("ts1", Types.STRING()) .field("ts2", Types.STRING()) ) // specify the update-mode for streaming tables .inAppendMode() // register as source, sink, or both and under a name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate( "INSERT INTO ruleTable " + result); result.insertInto("ruleTable"); Cheers,Dhanuka* -- Nothing Impossible,Creativity is more important than knowledge.