Hi Fabian, Thanks for the prompt reply and its working 🤗.
I am trying to deploy 200 SQL unions and it seems all the tasks getting failing after some time. How do i allocate memory for task manager and job manager. What are the factors need to be considered . Cheers Dhanuka On Sun, 13 Jan 2019, 22:05 Fabian Hueske <fhue...@gmail.com wrote: > Hi Dhanuka, > > The important error message here is "AppendStreamTableSink requires that > Table has only insert changes". > This is because you use UNION instead of UNION ALL, which implies > duplicate elimination. > Unfortunately, UNION is currently internally implemented as a regular > aggregration which produces a retraction stream (although, this would not > be necessary). > > If you don't require duplicate elimination, you can replace UNION by UNION > ALL and the query should work. > If you require duplicate elimination, it is currently not possible to use > SQL for your use case. > > There is thea Jira issue FLINK-9422 to improve this case [1]. > > Best, Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-9422 > > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe < > dhanuka.priyan...@gmail.com>: > >> Hi All, >> >> I am trying to select multiple results from Kafka and send results to >> Kafka >> different topic using Table API. But I am getting below error. Could you >> please help me on this. >> >> Query: >> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 , >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >> 4508724 >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >> UNION >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 , >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >> 4508724 >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >> UNION >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 , >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >> 4508724 >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >> >> >> *Error:* >> >> 2019-01-13 21:36:36,228 ERROR >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Exception >> occurred in REST handler. >> org.apache.flink.runtime.rest.handler.RestHandlerException: >> org.apache.flink.client.program.ProgramInvocationException: The main >> method >> caused an error. >> at >> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151) >> at >> >> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >> at >> >> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >> at >> >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >> at >> >> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.util.concurrent.CompletionException: >> org.apache.flink.client.program.ProgramInvocationException: The main >> method >> caused an error. >> at >> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228) >> at >> >> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >> ... 3 more >> Caused by: org.apache.flink.client.program.ProgramInvocationException: The >> main method caused an error. >> at >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) >> at >> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >> at >> >> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >> at >> >> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78) >> at >> >> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120) >> at >> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226) >> ... 4 more >> Caused by: org.apache.flink.table.api.TableException: >> AppendStreamTableSink >> requires that Table has only insert changes. >> at >> >> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382) >> at >> >> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784) >> at org.apache.flink.table.api.Table.insertInto(table.scala:877) >> at >> >> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) >> ... 9 more >> >> >> *Source Code:* >> >> >> >> >> >> >> >> >> *StreamTableEnvironment tableEnv = >> TableEnvironment.getTableEnvironment(env); >> tableEnv.registerFunction("mytime", new MyTime(10)); tableEnv.connect(new >> Kafka().version("0.10").topic("testin") .properties(kConsumer) >> .startFromLatest()) .withFormat(new >> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema()) >> .withSchema(new Schema() .field("InterceptID", "DECIMAL") >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR") >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP()) >> .rowtime(new >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000))) >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable >> windowedTable = // >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes")); >> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder(); >> for(String sql : rules) { if(multi.length() > 0) { multi.append(" >> UNION >> ").append("\n"); } multi.append( sql); } >> LOGGER.info("********************************* " + multi.toString()); >> Table >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the >> external system to connect to .connect(new Kafka().version("0.10") >> .topic("testout").startFromEarliest() .properties(kProducer) ) >> .withFormat(new Json().failOnMissingField(false).deriveSchema()) >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL()) >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING()) >> .field("ts2", Types.STRING()) ) // specify the update-mode for streaming >> tables .inAppendMode() // register as source, sink, or both and under a >> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate( >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable"); >> Cheers,Dhanuka* >> >> >> >> -- >> Nothing Impossible,Creativity is more important than knowledge. >> >