Thanks Kurt, I came to the same conclusions after trying what Jark provided. I can get similar behaviour if I reduce the grouping window to 1 sec but still keep the join window large.
Gyula On Fri, Mar 6, 2020 at 3:09 PM Kurt Young <ykt...@gmail.com> wrote: > @Gyula Fóra <gyula.f...@gmail.com> I think your query is right, we should > produce insert only results if you have event time and watermark defined. > I've create https://issues.apache.org/jira/browse/FLINK-16466 to track > this issue. > > Best, > Kurt > > > On Fri, Mar 6, 2020 at 12:14 PM Kurt Young <ykt...@gmail.com> wrote: > >> Actually this use case lead me to start thinking about one question: >> If watermark is enabled, could we also support GROUP BY event_time >> instead of forcing >> user defining a window based on the event_time. >> >> GROUP BY a standalone event_time can also be treated as a special window, >> which has >> both start_time and end_time equals to event_time. And when watermark >> surpass the event_time, >> we can still get the complete data of such group and do required >> aggregation and then emit >> insert only results. >> >> That would ease user's burden for not having to define a window when they >> already have event >> time and watermark defined. >> >> Best, >> Kurt >> >> >> On Fri, Mar 6, 2020 at 10:26 AM Jark Wu <imj...@gmail.com> wrote: >> >>> Hi Gyula, >>> >>> Does tumbling 5 seconds for aggregation meet your need? For example: >>> >>> INSERT INTO QueryResult >>> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5' >>> SECOND), sum(t.quantity) AS quantity >>> FROM >>> ItemTransactions AS t, >>> Queries AS q >>> WHERE >>> t.itemId = q.itemId AND >>> t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND >>> q.event_time >>> GROUP BY >>> t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND); >>> >>> Best, >>> Jark >>> >>> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gyula.f...@gmail.com> wrote: >>> >>>> I see, maybe I just dont understand how to properly express what I am >>>> trying to compute. >>>> >>>> Basically I want to aggregate the quantities of the transactions that >>>> happened in the 5 seconds before the query. >>>> Every query.id belongs to a single query (event_time, itemid) but >>>> still I have to group :/ >>>> >>>> Gyula >>>> >>>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <ykt...@gmail.com> wrote: >>>> >>>>> I think the issue is not caused by event time interval join, but the >>>>> aggregation after the join: >>>>> GROUP BY t.itemId, q.event_time, q.queryId; >>>>> >>>>> In this case, there is still no chance for Flink to determine whether >>>>> the groups like (itemId, eventtime, queryId) have complete data or not. >>>>> As a comparison, if you change the grouping key to a window which >>>>> based only on q.event_time, then the query would emit insert only results. >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com> >>>>> wrote: >>>>> >>>>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore >>>>>> all delete messages"). >>>>>> >>>>>> As for the data completion, in my above example it is basically an >>>>>> event time interval join. >>>>>> With watermarks defined Flink should be able to compute results once >>>>>> in exactly the same way as for the tumbling window. >>>>>> >>>>>> Gyula >>>>>> >>>>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote: >>>>>> >>>>>>> Back to this case, I assume you are expecting something like "ignore >>>>>>> all delete messages" flag? With this >>>>>>> flag turned on, Flink will only send insert messages which >>>>>>> corresponding current correct results to kafka and >>>>>>> drop all retractions and deletes on the fly. >>>>>>> >>>>>>> Best, >>>>>>> Kurt >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote: >>>>>>> >>>>>>>> > I also don't completely understand at this point why I can write >>>>>>>> the result of a group, tumble window aggregate to Kafka and not this >>>>>>>> window >>>>>>>> join / aggregate. >>>>>>>> >>>>>>>> If you are doing a tumble window aggregate with watermark enabled, >>>>>>>> Flink will only fire a final result for >>>>>>>> each window at once, no modification or retractions will happen >>>>>>>> after a window is calculated and fired. >>>>>>>> But with some other arbitrary aggregations, there is not enough >>>>>>>> information for Flink to determine whether >>>>>>>> the data is complete or not, so the framework will keep calculating >>>>>>>> results when receiving new records and >>>>>>>> retract earlier results by firing retraction/deletion messages. >>>>>>>> >>>>>>>> Best, >>>>>>>> Kurt >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Benoît! >>>>>>>>> >>>>>>>>> I can see now how I can implement this myself through the provided >>>>>>>>> sink interfaces but I was trying to avoid having to write code for >>>>>>>>> this :D >>>>>>>>> My initial motivation was to see whether we are able to write out >>>>>>>>> any kind of table to Kafka as a simple stream of "upserts". >>>>>>>>> >>>>>>>>> I also don't completely understand at this point why I can write >>>>>>>>> the result of a group, tumble window aggregate to Kafka and not this >>>>>>>>> window >>>>>>>>> join / aggregate. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Gyula >>>>>>>>> >>>>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris < >>>>>>>>> benoit.pa...@centraliens-lille.org> wrote: >>>>>>>>> >>>>>>>>>> Hi Gyula, >>>>>>>>>> >>>>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be >>>>>>>>>> done in pure SQL (though I'd love that feature). >>>>>>>>>> >>>>>>>>>> You might want to go lower level and implement a >>>>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a >>>>>>>>>> KafkaTableSink >>>>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, >>>>>>>>>> T>> >>>>>>>>>> dataStream);, in which the Boolean flag will give you an >>>>>>>>>> 'accumulate' or >>>>>>>>>> 'retract' signal. >>>>>>>>>> You can then filter the DataStream accordingly before passing to >>>>>>>>>> the KafkaTableSink. >>>>>>>>>> >>>>>>>>>> Hope this helps. >>>>>>>>>> >>>>>>>>>> Best regards >>>>>>>>>> Benoît >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html >>>>>>>>>> [2] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink >>>>>>>>>> [3] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html >>>>>>>>>> >>>>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Roman, >>>>>>>>>>> >>>>>>>>>>> This is the core logic: >>>>>>>>>>> >>>>>>>>>>> CREATE TABLE QueryResult ( >>>>>>>>>>> queryId BIGINT, >>>>>>>>>>> itemId STRING, >>>>>>>>>>> quantity INT >>>>>>>>>>> ) WITH ( >>>>>>>>>>> 'connector.type' = 'kafka', >>>>>>>>>>> 'connector.version' = 'universal', >>>>>>>>>>> 'connector.topic' = 'query.output.log.1', >>>>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>', >>>>>>>>>>> 'format.type' = 'json' >>>>>>>>>>> ); >>>>>>>>>>> >>>>>>>>>>> INSERT INTO QueryResult >>>>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity >>>>>>>>>>> FROM >>>>>>>>>>> ItemTransactions AS t, >>>>>>>>>>> Queries AS q >>>>>>>>>>> WHERE >>>>>>>>>>> t.itemId = q.itemId AND >>>>>>>>>>> t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND >>>>>>>>>>> q.event_time >>>>>>>>>>> GROUP BY >>>>>>>>>>> t.itemId, q.event_time, q.queryId; >>>>>>>>>>> >>>>>>>>>>> And the error I get is: >>>>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: >>>>>>>>>>> Invalid SQL update statement. >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310) >>>>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) >>>>>>>>>>> Caused by: org.apache.flink.table.api.TableException: >>>>>>>>>>> AppendStreamTableSink requires that Table has only insert changes. >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>>>> >>>>>>>>>>> I am wondering what could I do to just simply pump the result >>>>>>>>>>> updates to Kafka here. >>>>>>>>>>> >>>>>>>>>>> Gyula >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman < >>>>>>>>>>> khachatryan.ro...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Gyula, >>>>>>>>>>>> >>>>>>>>>>>> Could you provide the code of your Flink program, the error >>>>>>>>>>>> with stacktrace and the Flink version? >>>>>>>>>>>> >>>>>>>>>>>> Thanks., >>>>>>>>>>>> Roman >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi All! >>>>>>>>>>>>> >>>>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL >>>>>>>>>>>>> API and I am trying to play around with it implementing and >>>>>>>>>>>>> running a few >>>>>>>>>>>>> use-cases. >>>>>>>>>>>>> >>>>>>>>>>>>> I have a simple window join + aggregation, grouped on some id >>>>>>>>>>>>> that I want to write to Kafka but I am hitting the following >>>>>>>>>>>>> error: >>>>>>>>>>>>> >>>>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert >>>>>>>>>>>>> changes." >>>>>>>>>>>>> >>>>>>>>>>>>> If I understand correctly the problem here is that since >>>>>>>>>>>>> updates are possible within a single group, we have a retract >>>>>>>>>>>>> stream and >>>>>>>>>>>>> the Kafka Sink cannot handle that. I tried to search for the >>>>>>>>>>>>> solution but I >>>>>>>>>>>>> haven't found any satisfying answers. >>>>>>>>>>>>> >>>>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous >>>>>>>>>>>>> values and just always keep sending the latest (like you would >>>>>>>>>>>>> see it on >>>>>>>>>>>>> the CLI output). >>>>>>>>>>>>> >>>>>>>>>>>>> Thank you! >>>>>>>>>>>>> Gyula >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Benoît Paris >>>>>>>>>> Ingénieur Machine Learning Explicable >>>>>>>>>> Tél : +33 6 60 74 23 00 >>>>>>>>>> http://benoit.paris >>>>>>>>>> http://explicable.ml >>>>>>>>>> >>>>>>>>>