Hi, That's a Java limitation. Methods cannot be larger than 64kb and code that is generated for this predicate exceeds the limit. There is a Jira issue to fix the problem.
In the meantime, I'd follow a hybrid approach and UNION ALL only as many tables as you need to avoid the code compilation exception. Best, Fabian Am Mo., 14. Jan. 2019 um 14:15 Uhr schrieb dhanuka ranasinghe < dhanuka.priyan...@gmail.com>: > Hi Fabian , > > I was encounter below error with 200 OR operators so I guess this is JVM > level limitation. > > Error : > > of class "datastreamcalcrule" grows beyond 64 kb > > Cheers > Dhanuka > > > On Mon, 14 Jan 2019, 20:30 Fabian Hueske <fhue...@gmail.com wrote: > >> Hi, >> >> you should avoid the UNION ALL approach because the query will scan the >> (identical?) Kafka topic 200 times which is highly inefficient. >> You should rather use your second approach and scale the query >> appropriately. >> >> Best, Fabian >> >> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe < >> dhanuka.priyan...@gmail.com>: >> >>> SORRY about sending mail without completing :) , >>> >>> >>> I also tried out different approach , which is instead of UNION ALL, use >>> OR as below. >>> >>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >>> '%193400835%' >>> ) OR >>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >>> '%193400835%' >>> ) OR >>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >>> '%193400835%' >>> ) >>> >>> But only downside is , with this approach if all the where clause >>> conditions sets equal it seems Flink behave like use only one condition set. >>> >>> I have attached screenshot here with. >>> >>> Could you please explain me about this? Thanks in advance. >>> >>> Cheers, >>> >>> Dhanuka >>> >>> >>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe < >>> dhanuka.priyan...@gmail.com> wrote: >>> >>>> Hi Hequn, >>>> >>>> I think it's obvious when we see the job graph for 200 unions. I have >>>> attached the screenshot here with. >>>> >>>> I also tried out different approach , which is instead of UNION ALL >>>> >>>> >>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <chenghe...@gmail.com> >>>> wrote: >>>> >>>>> Hi dhanuka, >>>>> >>>>> > I am trying to deploy 200 SQL unions and it seems all the tasks >>>>> getting failing after some time. >>>>> Would be great if you can show us some information(say exception >>>>> stack) about the failure. Is it caused by OOM of job manager? >>>>> >>>>> > How do i allocate memory for task manager and job manager. What are >>>>> the factors need to be considered . >>>>> According to your SQL, I guess you need more memory for the job >>>>> manager[1] since you unionAll 200 tables, the job graph should be a bit >>>>> big. As for the taskmanger, I think it may be ok to use the default memory >>>>> setting unless you allocate a lot of memory in your UDFs or you just want >>>>> to make better use of the memory(we can discuss more if you like). >>>>> >>>>> Best, Hequn >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager >>>>> >>>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe < >>>>> dhanuka.priyan...@gmail.com> wrote: >>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> Thanks for the prompt reply and its working 🤗. >>>>>> >>>>>> I am trying to deploy 200 SQL unions and it seems all the tasks >>>>>> getting >>>>>> failing after some time. >>>>>> >>>>>> How do i allocate memory for task manager and job manager. What are >>>>>> the >>>>>> factors need to be considered . >>>>>> >>>>>> Cheers >>>>>> Dhanuka >>>>>> >>>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <fhue...@gmail.com wrote: >>>>>> >>>>>> > Hi Dhanuka, >>>>>> > >>>>>> > The important error message here is "AppendStreamTableSink requires >>>>>> that >>>>>> > Table has only insert changes". >>>>>> > This is because you use UNION instead of UNION ALL, which implies >>>>>> > duplicate elimination. >>>>>> > Unfortunately, UNION is currently internally implemented as a >>>>>> regular >>>>>> > aggregration which produces a retraction stream (although, this >>>>>> would not >>>>>> > be necessary). >>>>>> > >>>>>> > If you don't require duplicate elimination, you can replace UNION >>>>>> by UNION >>>>>> > ALL and the query should work. >>>>>> > If you require duplicate elimination, it is currently not possible >>>>>> to use >>>>>> > SQL for your use case. >>>>>> > >>>>>> > There is thea Jira issue FLINK-9422 to improve this case [1]. >>>>>> > >>>>>> > Best, Fabian >>>>>> > >>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422 >>>>>> > >>>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe < >>>>>> > dhanuka.priyan...@gmail.com>: >>>>>> > >>>>>> >> Hi All, >>>>>> >> >>>>>> >> I am trying to select multiple results from Kafka and send results >>>>>> to >>>>>> >> Kafka >>>>>> >> different topic using Table API. But I am getting below error. >>>>>> Could you >>>>>> >> please help me on this. >>>>>> >> >>>>>> >> Query: >>>>>> >> >>>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 , >>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>>>>> >> 4508724 >>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>>> >> UNION >>>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 , >>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>>>>> >> 4508724 >>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>>> >> UNION >>>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 , >>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>>>>> >> 4508724 >>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>>> >> >>>>>> >> >>>>>> >> *Error:* >>>>>> >> >>>>>> >> 2019-01-13 21:36:36,228 ERROR >>>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - >>>>>> Exception >>>>>> >> occurred in REST handler. >>>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException: >>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The >>>>>> main >>>>>> >> method >>>>>> >> caused an error. >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>> >> at java.lang.Thread.run(Thread.java:748) >>>>>> >> Caused by: java.util.concurrent.CompletionException: >>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The >>>>>> main >>>>>> >> method >>>>>> >> caused an error. >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>>>> >> ... 3 more >>>>>> >> Caused by: >>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>> >> main method caused an error. >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226) >>>>>> >> ... 4 more >>>>>> >> Caused by: org.apache.flink.table.api.TableException: >>>>>> >> AppendStreamTableSink >>>>>> >> requires that Table has only insert changes. >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784) >>>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153) >>>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> >> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> >> at >>>>>> >> >>>>>> >> >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) >>>>>> >> ... 9 more >>>>>> >> >>>>>> >> >>>>>> >> *Source Code:* >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> *StreamTableEnvironment tableEnv = >>>>>> >> TableEnvironment.getTableEnvironment(env); >>>>>> >> tableEnv.registerFunction("mytime", new MyTime(10)); >>>>>> tableEnv.connect(new >>>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer) >>>>>> >> .startFromLatest()) .withFormat(new >>>>>> >> >>>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema()) >>>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL") >>>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR") >>>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP()) >>>>>> >> .rowtime(new >>>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000))) >>>>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable >>>>>> >> windowedTable = // >>>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes")); >>>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new >>>>>> StringBuilder(); >>>>>> >> for(String sql : rules) { if(multi.length() > 0) { >>>>>> multi.append(" >>>>>> >> UNION >>>>>> >> ").append("\n"); } multi.append( sql); } >>>>>> >> LOGGER.info("********************************* " + >>>>>> multi.toString()); >>>>>> >> Table >>>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare >>>>>> the >>>>>> >> external system to connect to .connect(new Kafka().version("0.10") >>>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) ) >>>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema()) >>>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL()) >>>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING()) >>>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for >>>>>> streaming >>>>>> >> tables .inAppendMode() // register as source, sink, or both and >>>>>> under a >>>>>> >> name .registerTableSourceAndSink("ruleTable"); >>>>>> //tableEnv.sqlUpdate( >>>>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable"); >>>>>> >> Cheers,Dhanuka* >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> -- >>>>>> >> Nothing Impossible,Creativity is more important than knowledge. >>>>>> >> >>>>>> > >>>>>> >>>>> >>>> >>>> -- >>>> Nothing Impossible,Creativity is more important than knowledge. >>>> >>> >>> >>> -- >>> Nothing Impossible,Creativity is more important than knowledge. >>> >> > On 14 Jan 2019 20:30, "Fabian Hueske" <fhue...@gmail.com> wrote: > > Hi, > > you should avoid the UNION ALL approach because the query will scan the > (identical?) Kafka topic 200 times which is highly inefficient. > You should rather use your second approach and scale the query > appropriately. > > Best, Fabian > > Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe < > dhanuka.priyan...@gmail.com>: > >> SORRY about sending mail without completing :) , >> >> >> I also tried out different approach , which is instead of UNION ALL, use >> OR as below. >> >> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >> '%193400835%' >> ) OR >> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >> '%193400835%' >> ) OR >> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE >> '%193400835%' >> ) >> >> But only downside is , with this approach if all the where clause conditions >> sets equal it seems Flink behave like use only one condition set. >> >> I have attached screenshot here with. >> >> Could you please explain me about this? Thanks in advance. >> >> Cheers, >> >> Dhanuka >> >> >> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe < >> dhanuka.priyan...@gmail.com> wrote: >> >>> Hi Hequn, >>> >>> I think it's obvious when we see the job graph for 200 unions. I have >>> attached the screenshot here with. >>> >>> I also tried out different approach , which is instead of UNION ALL >>> >>> >>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <chenghe...@gmail.com> >>> wrote: >>> >>>> Hi dhanuka, >>>> >>>> > I am trying to deploy 200 SQL unions and it seems all the tasks >>>> getting failing after some time. >>>> Would be great if you can show us some information(say exception stack) >>>> about the failure. Is it caused by OOM of job manager? >>>> >>>> > How do i allocate memory for task manager and job manager. What are >>>> the factors need to be considered . >>>> According to your SQL, I guess you need more memory for the job >>>> manager[1] since you unionAll 200 tables, the job graph should be a bit >>>> big. As for the taskmanger, I think it may be ok to use the default memory >>>> setting unless you allocate a lot of memory in your UDFs or you just want >>>> to make better use of the memory(we can discuss more if you like). >>>> >>>> Best, Hequn >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager >>>> >>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe < >>>> dhanuka.priyan...@gmail.com> wrote: >>>> >>>>> Hi Fabian, >>>>> >>>>> Thanks for the prompt reply and its working 🤗. >>>>> >>>>> I am trying to deploy 200 SQL unions and it seems all the tasks getting >>>>> failing after some time. >>>>> >>>>> How do i allocate memory for task manager and job manager. What are the >>>>> factors need to be considered . >>>>> >>>>> Cheers >>>>> Dhanuka >>>>> >>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <fhue...@gmail.com wrote: >>>>> >>>>> > Hi Dhanuka, >>>>> > >>>>> > The important error message here is "AppendStreamTableSink requires >>>>> that >>>>> > Table has only insert changes". >>>>> > This is because you use UNION instead of UNION ALL, which implies >>>>> > duplicate elimination. >>>>> > Unfortunately, UNION is currently internally implemented as a regular >>>>> > aggregration which produces a retraction stream (although, this >>>>> would not >>>>> > be necessary). >>>>> > >>>>> > If you don't require duplicate elimination, you can replace UNION by >>>>> UNION >>>>> > ALL and the query should work. >>>>> > If you require duplicate elimination, it is currently not possible >>>>> to use >>>>> > SQL for your use case. >>>>> > >>>>> > There is thea Jira issue FLINK-9422 to improve this case [1]. >>>>> > >>>>> > Best, Fabian >>>>> > >>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422 >>>>> > >>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe < >>>>> > dhanuka.priyan...@gmail.com>: >>>>> > >>>>> >> Hi All, >>>>> >> >>>>> >> I am trying to select multiple results from Kafka and send results >>>>> to >>>>> >> Kafka >>>>> >> different topic using Table API. But I am getting below error. >>>>> Could you >>>>> >> please help me on this. >>>>> >> >>>>> >> Query: >>>>> >> >>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 , >>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>>>> >> 4508724 >>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>> >> UNION >>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 , >>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>>>> >> 4508724 >>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>> >> UNION >>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 , >>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = >>>>> >> 4508724 >>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' >>>>> >> >>>>> >> >>>>> >> *Error:* >>>>> >> >>>>> >> 2019-01-13 21:36:36,228 ERROR >>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - >>>>> Exception >>>>> >> occurred in REST handler. >>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException: >>>>> >> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> >> method >>>>> >> caused an error. >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151) >>>>> >> at >>>>> >> >>>>> >> >>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >>>>> >> at >>>>> >> >>>>> >> >>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >>>>> >> at >>>>> >> >>>>> >> >>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>>> >> at >>>>> >> >>>>> >> >>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) >>>>> >> at >>>>> >> >>>>> >> >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>> >> at >>>>> >> >>>>> >> >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>> >> at java.lang.Thread.run(Thread.java:748) >>>>> >> Caused by: java.util.concurrent.CompletionException: >>>>> >> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> >> method >>>>> >> caused an error. >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228) >>>>> >> at >>>>> >> >>>>> >> >>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>>> >> ... 3 more >>>>> >> Caused by: >>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>> >> main method caused an error. >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78) >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120) >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226) >>>>> >> ... 4 more >>>>> >> Caused by: org.apache.flink.table.api.TableException: >>>>> >> AppendStreamTableSink >>>>> >> requires that Table has only insert changes. >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382) >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784) >>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877) >>>>> >> at >>>>> >> >>>>> >> >>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153) >>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> >> at >>>>> >> >>>>> >> >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> >> at >>>>> >> >>>>> >> >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> >> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> >> at >>>>> >> >>>>> >> >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) >>>>> >> ... 9 more >>>>> >> >>>>> >> >>>>> >> *Source Code:* >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> *StreamTableEnvironment tableEnv = >>>>> >> TableEnvironment.getTableEnvironment(env); >>>>> >> tableEnv.registerFunction("mytime", new MyTime(10)); >>>>> tableEnv.connect(new >>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer) >>>>> >> .startFromLatest()) .withFormat(new >>>>> >> >>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema()) >>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL") >>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR") >>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP()) >>>>> >> .rowtime(new >>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000))) >>>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable >>>>> >> windowedTable = // >>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes")); >>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new >>>>> StringBuilder(); >>>>> >> for(String sql : rules) { if(multi.length() > 0) { >>>>> multi.append(" >>>>> >> UNION >>>>> >> ").append("\n"); } multi.append( sql); } >>>>> >> LOGGER.info("********************************* " + >>>>> multi.toString()); >>>>> >> Table >>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare >>>>> the >>>>> >> external system to connect to .connect(new Kafka().version("0.10") >>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) ) >>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema()) >>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL()) >>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING()) >>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for >>>>> streaming >>>>> >> tables .inAppendMode() // register as source, sink, or both and >>>>> under a >>>>> >> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate( >>>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable"); >>>>> >> Cheers,Dhanuka* >>>>> >> >>>>> >> >>>>> >> >>>>> >> -- >>>>> >> Nothing Impossible,Creativity is more important than knowledge. >>>>> >> >>>>> > >>>>> >>>> >>> >>> -- >>> Nothing Impossible,Creativity is more important than knowledge. >>> >> >> >> -- >> Nothing Impossible,Creativity is more important than knowledge. >> > >