Hi Fabian,

Thanks for the prompt reply and its working 🤗.

I am trying to deploy 200 SQL unions and it seems all the tasks getting
failing after some time.

How do i allocate memory for task manager and job manager. What are the
factors need to be considered .

Cheers
Dhanuka

On Sun, 13 Jan 2019, 22:05 Fabian Hueske <fhue...@gmail.com wrote:

> Hi Dhanuka,
>
> The important error message here is "AppendStreamTableSink requires that
> Table has only insert changes".
> This is because you use UNION instead of UNION ALL, which implies
> duplicate elimination.
> Unfortunately, UNION is currently internally implemented as a regular
> aggregration which produces a retraction stream (although, this would not
> be necessary).
>
> If you don't require duplicate elimination, you can replace UNION by UNION
> ALL and the query should work.
> If you require duplicate elimination, it is currently not possible to use
> SQL for your use case.
>
> There is thea Jira issue FLINK-9422 to improve this case [1].
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9422
>
> Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
> dhanuka.priyan...@gmail.com>:
>
>> Hi All,
>>
>> I am trying to select multiple results from Kafka and send results to
>> Kafka
>> different topic using Table API. But I am getting below error. Could you
>> please help me on this.
>>
>> Query:
>>
>> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>  UNION
>> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>  UNION
>> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>
>>
>> *Error:*
>>
>> 2019-01-13 21:36:36,228 ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception
>> occurred in REST handler.
>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error.
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>> at
>>
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> at
>>
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>> at
>>
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error.
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>> at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> ... 3 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error.
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>> at
>>
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>> at
>>
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>> at
>>
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>> ... 4 more
>> Caused by: org.apache.flink.table.api.TableException:
>> AppendStreamTableSink
>> requires that Table has only insert changes.
>> at
>>
>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>> at
>>
>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>> at
>>
>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>> ... 9 more
>>
>>
>> *Source Code:*
>>
>>
>>
>>
>>
>>
>>
>>
>> *StreamTableEnvironment tableEnv =
>> TableEnvironment.getTableEnvironment(env);
>> tableEnv.registerFunction("mytime", new MyTime(10)); tableEnv.connect(new
>> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>> .startFromLatest()) .withFormat(new
>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>> .rowtime(new
>> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>> windowedTable = //
>> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
>> for(String sql : rules) {     if(multi.length() > 0) { multi.append("
>> UNION
>> ").append("\n");     }     multi.append( sql);      }
>> LOGGER.info("********************************* " + multi.toString());
>> Table
>> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
>> external system to connect to .connect(new Kafka().version("0.10")
>> .topic("testout").startFromEarliest() .properties(kProducer) )
>> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>> .field("ts2", Types.STRING()) ) // specify the update-mode for streaming
>> tables .inAppendMode() // register as source, sink, or both and under a
>> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
>> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>> Cheers,Dhanuka*
>>
>>
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>

Reply via email to