Hi Dhanuka,

The important error message here is "AppendStreamTableSink requires that
Table has only insert changes".
This is because you use UNION instead of UNION ALL, which implies duplicate
elimination.
Unfortunately, UNION is currently internally implemented as a regular
aggregration which produces a retraction stream (although, this would not
be necessary).

If you don't require duplicate elimination, you can replace UNION by UNION
ALL and the query should work.
If you require duplicate elimination, it is currently not possible to use
SQL for your use case.

There is thea Jira issue FLINK-9422 to improve this case [1].

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9422

Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
dhanuka.priyan...@gmail.com>:

> Hi All,
>
> I am trying to select multiple results from Kafka and send results to Kafka
> different topic using Table API. But I am getting below error. Could you
> please help me on this.
>
> Query:
>
> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>  UNION
> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>  UNION
> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>
>
> *Error:*
>
> 2019-01-13 21:36:36,228 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception
> occurred in REST handler.
> org.apache.flink.runtime.rest.handler.RestHandlerException:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
> at
>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
> at
>
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
>
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
> at
>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
> at
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 3 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
>
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> at
>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
> at
>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
> at
>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
> ... 4 more
> Caused by: org.apache.flink.table.api.TableException: AppendStreamTableSink
> requires that Table has only insert changes.
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
> at
>
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
> at
>
> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> ... 9 more
>
>
> *Source Code:*
>
>
>
>
>
>
>
>
> *StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
> tableEnv.registerFunction("mytime", new MyTime(10)); tableEnv.connect(new
> Kafka().version("0.10").topic("testin") .properties(kConsumer)
> .startFromLatest()) .withFormat(new
> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
> .rowtime(new
> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
> windowedTable = //
> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
> for(String sql : rules) {     if(multi.length() > 0) { multi.append(" UNION
> ").append("\n");     }     multi.append( sql);      }
> LOGGER.info("********************************* " + multi.toString()); Table
> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
> external system to connect to .connect(new Kafka().version("0.10")
> .topic("testout").startFromEarliest() .properties(kProducer) )
> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
> .field("ts2", Types.STRING()) ) // specify the update-mode for streaming
> tables .inAppendMode() // register as source, sink, or both and under a
> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
> Cheers,Dhanuka*
>
>
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
>

Reply via email to