@Gyula Fóra <gyula.f...@gmail.com> I think your query is right, we should produce insert only results if you have event time and watermark defined. I've create https://issues.apache.org/jira/browse/FLINK-16466 to track this issue.
Best, Kurt On Fri, Mar 6, 2020 at 12:14 PM Kurt Young <ykt...@gmail.com> wrote: > Actually this use case lead me to start thinking about one question: > If watermark is enabled, could we also support GROUP BY event_time instead > of forcing > user defining a window based on the event_time. > > GROUP BY a standalone event_time can also be treated as a special window, > which has > both start_time and end_time equals to event_time. And when watermark > surpass the event_time, > we can still get the complete data of such group and do required > aggregation and then emit > insert only results. > > That would ease user's burden for not having to define a window when they > already have event > time and watermark defined. > > Best, > Kurt > > > On Fri, Mar 6, 2020 at 10:26 AM Jark Wu <imj...@gmail.com> wrote: > >> Hi Gyula, >> >> Does tumbling 5 seconds for aggregation meet your need? For example: >> >> INSERT INTO QueryResult >> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5' >> SECOND), sum(t.quantity) AS quantity >> FROM >> ItemTransactions AS t, >> Queries AS q >> WHERE >> t.itemId = q.itemId AND >> t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time >> GROUP BY >> t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND); >> >> Best, >> Jark >> >> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> I see, maybe I just dont understand how to properly express what I am >>> trying to compute. >>> >>> Basically I want to aggregate the quantities of the transactions that >>> happened in the 5 seconds before the query. >>> Every query.id belongs to a single query (event_time, itemid) but still >>> I have to group :/ >>> >>> Gyula >>> >>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <ykt...@gmail.com> wrote: >>> >>>> I think the issue is not caused by event time interval join, but the >>>> aggregation after the join: >>>> GROUP BY t.itemId, q.event_time, q.queryId; >>>> >>>> In this case, there is still no chance for Flink to determine whether >>>> the groups like (itemId, eventtime, queryId) have complete data or not. >>>> As a comparison, if you change the grouping key to a window which based >>>> only on q.event_time, then the query would emit insert only results. >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com> >>>> wrote: >>>> >>>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore >>>>> all delete messages"). >>>>> >>>>> As for the data completion, in my above example it is basically an >>>>> event time interval join. >>>>> With watermarks defined Flink should be able to compute results once >>>>> in exactly the same way as for the tumbling window. >>>>> >>>>> Gyula >>>>> >>>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote: >>>>> >>>>>> Back to this case, I assume you are expecting something like "ignore >>>>>> all delete messages" flag? With this >>>>>> flag turned on, Flink will only send insert messages which >>>>>> corresponding current correct results to kafka and >>>>>> drop all retractions and deletes on the fly. >>>>>> >>>>>> Best, >>>>>> Kurt >>>>>> >>>>>> >>>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote: >>>>>> >>>>>>> > I also don't completely understand at this point why I can write >>>>>>> the result of a group, tumble window aggregate to Kafka and not this >>>>>>> window >>>>>>> join / aggregate. >>>>>>> >>>>>>> If you are doing a tumble window aggregate with watermark enabled, >>>>>>> Flink will only fire a final result for >>>>>>> each window at once, no modification or retractions will happen >>>>>>> after a window is calculated and fired. >>>>>>> But with some other arbitrary aggregations, there is not enough >>>>>>> information for Flink to determine whether >>>>>>> the data is complete or not, so the framework will keep calculating >>>>>>> results when receiving new records and >>>>>>> retract earlier results by firing retraction/deletion messages. >>>>>>> >>>>>>> Best, >>>>>>> Kurt >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Benoît! >>>>>>>> >>>>>>>> I can see now how I can implement this myself through the provided >>>>>>>> sink interfaces but I was trying to avoid having to write code for >>>>>>>> this :D >>>>>>>> My initial motivation was to see whether we are able to write out >>>>>>>> any kind of table to Kafka as a simple stream of "upserts". >>>>>>>> >>>>>>>> I also don't completely understand at this point why I can write >>>>>>>> the result of a group, tumble window aggregate to Kafka and not this >>>>>>>> window >>>>>>>> join / aggregate. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Gyula >>>>>>>> >>>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris < >>>>>>>> benoit.pa...@centraliens-lille.org> wrote: >>>>>>>> >>>>>>>>> Hi Gyula, >>>>>>>>> >>>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be >>>>>>>>> done in pure SQL (though I'd love that feature). >>>>>>>>> >>>>>>>>> You might want to go lower level and implement a >>>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a >>>>>>>>> KafkaTableSink >>>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, >>>>>>>>> T>> >>>>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' >>>>>>>>> or >>>>>>>>> 'retract' signal. >>>>>>>>> You can then filter the DataStream accordingly before passing to >>>>>>>>> the KafkaTableSink. >>>>>>>>> >>>>>>>>> Hope this helps. >>>>>>>>> >>>>>>>>> Best regards >>>>>>>>> Benoît >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html >>>>>>>>> [2] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink >>>>>>>>> [3] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html >>>>>>>>> >>>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Roman, >>>>>>>>>> >>>>>>>>>> This is the core logic: >>>>>>>>>> >>>>>>>>>> CREATE TABLE QueryResult ( >>>>>>>>>> queryId BIGINT, >>>>>>>>>> itemId STRING, >>>>>>>>>> quantity INT >>>>>>>>>> ) WITH ( >>>>>>>>>> 'connector.type' = 'kafka', >>>>>>>>>> 'connector.version' = 'universal', >>>>>>>>>> 'connector.topic' = 'query.output.log.1', >>>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>', >>>>>>>>>> 'format.type' = 'json' >>>>>>>>>> ); >>>>>>>>>> >>>>>>>>>> INSERT INTO QueryResult >>>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity >>>>>>>>>> FROM >>>>>>>>>> ItemTransactions AS t, >>>>>>>>>> Queries AS q >>>>>>>>>> WHERE >>>>>>>>>> t.itemId = q.itemId AND >>>>>>>>>> t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND >>>>>>>>>> q.event_time >>>>>>>>>> GROUP BY >>>>>>>>>> t.itemId, q.event_time, q.queryId; >>>>>>>>>> >>>>>>>>>> And the error I get is: >>>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: >>>>>>>>>> Invalid SQL update statement. >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310) >>>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) >>>>>>>>>> Caused by: org.apache.flink.table.api.TableException: >>>>>>>>>> AppendStreamTableSink requires that Table has only insert changes. >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>> >>>>>>>>>> I am wondering what could I do to just simply pump the result >>>>>>>>>> updates to Kafka here. >>>>>>>>>> >>>>>>>>>> Gyula >>>>>>>>>> >>>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman < >>>>>>>>>> khachatryan.ro...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Gyula, >>>>>>>>>>> >>>>>>>>>>> Could you provide the code of your Flink program, the error with >>>>>>>>>>> stacktrace and the Flink version? >>>>>>>>>>> >>>>>>>>>>> Thanks., >>>>>>>>>>> Roman >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi All! >>>>>>>>>>>> >>>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API >>>>>>>>>>>> and I am trying to play around with it implementing and running a >>>>>>>>>>>> few >>>>>>>>>>>> use-cases. >>>>>>>>>>>> >>>>>>>>>>>> I have a simple window join + aggregation, grouped on some id >>>>>>>>>>>> that I want to write to Kafka but I am hitting the following error: >>>>>>>>>>>> >>>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert >>>>>>>>>>>> changes." >>>>>>>>>>>> >>>>>>>>>>>> If I understand correctly the problem here is that since >>>>>>>>>>>> updates are possible within a single group, we have a retract >>>>>>>>>>>> stream and >>>>>>>>>>>> the Kafka Sink cannot handle that. I tried to search for the >>>>>>>>>>>> solution but I >>>>>>>>>>>> haven't found any satisfying answers. >>>>>>>>>>>> >>>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous >>>>>>>>>>>> values and just always keep sending the latest (like you would see >>>>>>>>>>>> it on >>>>>>>>>>>> the CLI output). >>>>>>>>>>>> >>>>>>>>>>>> Thank you! >>>>>>>>>>>> Gyula >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Benoît Paris >>>>>>>>> Ingénieur Machine Learning Explicable >>>>>>>>> Tél : +33 6 60 74 23 00 >>>>>>>>> http://benoit.paris >>>>>>>>> http://explicable.ml >>>>>>>>> >>>>>>>>