Thanks Kurt, I came to the same conclusions after trying what Jark
provided. I can get similar behaviour if I reduce the grouping window to 1
sec but still keep the join window large.

Gyula

On Fri, Mar 6, 2020 at 3:09 PM Kurt Young <ykt...@gmail.com> wrote:

> @Gyula Fóra <gyula.f...@gmail.com> I think your query is right, we should
> produce insert only results if you have event time and watermark defined.
> I've create https://issues.apache.org/jira/browse/FLINK-16466 to track
> this issue.
>
> Best,
> Kurt
>
>
> On Fri, Mar 6, 2020 at 12:14 PM Kurt Young <ykt...@gmail.com> wrote:
>
>> Actually this use case lead me to start thinking about one question:
>> If watermark is enabled, could we also support GROUP BY event_time
>> instead of forcing
>> user defining a window based on the event_time.
>>
>> GROUP BY a standalone event_time can also be treated as a special window,
>> which has
>> both start_time and end_time equals to event_time. And when watermark
>> surpass the event_time,
>> we can still get the complete data of such group and do required
>> aggregation and then emit
>> insert only results.
>>
>> That would ease user's burden for not having to define a window when they
>> already have event
>> time and watermark defined.
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Mar 6, 2020 at 10:26 AM Jark Wu <imj...@gmail.com> wrote:
>>
>>> Hi Gyula,
>>>
>>> Does tumbling 5 seconds for aggregation meet your need? For example:
>>>
>>> INSERT INTO QueryResult
>>> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
>>> SECOND), sum(t.quantity) AS quantity
>>> FROM
>>>   ItemTransactions AS t,
>>>   Queries AS q
>>> WHERE
>>>   t.itemId = q.itemId AND
>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>> q.event_time
>>> GROUP BY
>>>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>
>>>> I see, maybe I just dont understand how to properly express what I am
>>>> trying to compute.
>>>>
>>>> Basically I want to aggregate the quantities of the transactions that
>>>> happened in the 5 seconds before the query.
>>>> Every query.id belongs to a single query (event_time, itemid) but
>>>> still I have to group :/
>>>>
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>
>>>>> I think the issue is not caused by event time interval join, but the
>>>>> aggregation after the join:
>>>>>     GROUP BY t.itemId, q.event_time, q.queryId;
>>>>>
>>>>> In this case, there is still no chance for Flink to determine whether
>>>>> the groups like (itemId, eventtime, queryId) have complete data or not.
>>>>> As a comparison, if you change the grouping key to a window which
>>>>> based only on q.event_time, then the query would emit insert only results.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore
>>>>>> all delete messages").
>>>>>>
>>>>>> As for the data completion, in my above example it is basically an
>>>>>> event time interval join.
>>>>>> With watermarks defined Flink should be able to compute results once
>>>>>> in exactly the same way as for the tumbling window.
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>>>
>>>>>>> Back to this case, I assume you are expecting something like "ignore
>>>>>>> all delete messages" flag? With this
>>>>>>> flag turned on, Flink will only send insert messages which
>>>>>>> corresponding current correct results to kafka and
>>>>>>> drop all retractions and deletes on the fly.
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>>>>
>>>>>>>> > I also don't completely understand at this point why I can write
>>>>>>>> the result of a group, tumble window aggregate to Kafka and not this 
>>>>>>>> window
>>>>>>>> join / aggregate.
>>>>>>>>
>>>>>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>>>>>> Flink will only fire a final result for
>>>>>>>> each window at once, no modification or retractions will happen
>>>>>>>> after a window is calculated and fired.
>>>>>>>> But with some other arbitrary aggregations, there is not enough
>>>>>>>> information for Flink to determine whether
>>>>>>>> the data is complete or not, so the framework will keep calculating
>>>>>>>> results when receiving new records and
>>>>>>>> retract earlier results by firing retraction/deletion messages.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Kurt
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Benoît!
>>>>>>>>>
>>>>>>>>> I can see now how I can implement this myself through the provided
>>>>>>>>> sink interfaces but I was trying to avoid having to write code for 
>>>>>>>>> this :D
>>>>>>>>> My initial motivation was to see whether we are able to write out
>>>>>>>>> any kind of table to Kafka as a simple stream of "upserts".
>>>>>>>>>
>>>>>>>>> I also don't completely understand at this point why I can write
>>>>>>>>> the result of a group, tumble window aggregate to Kafka and not this 
>>>>>>>>> window
>>>>>>>>> join / aggregate.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>>>>>>> benoit.pa...@centraliens-lille.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Gyula,
>>>>>>>>>>
>>>>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be
>>>>>>>>>> done in pure SQL (though I'd love that feature).
>>>>>>>>>>
>>>>>>>>>> You might want to go lower level and implement a
>>>>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a 
>>>>>>>>>> KafkaTableSink
>>>>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, 
>>>>>>>>>> T>>
>>>>>>>>>> dataStream);, in which the Boolean flag will give you an 
>>>>>>>>>> 'accumulate' or
>>>>>>>>>> 'retract' signal.
>>>>>>>>>> You can then filter the DataStream accordingly before passing to
>>>>>>>>>> the KafkaTableSink.
>>>>>>>>>>
>>>>>>>>>> Hope this helps.
>>>>>>>>>>
>>>>>>>>>> Best regards
>>>>>>>>>> Benoît
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>>>>>>> [2]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>>>>>>> [3]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Roman,
>>>>>>>>>>>
>>>>>>>>>>> This is the core logic:
>>>>>>>>>>>
>>>>>>>>>>> CREATE TABLE QueryResult (
>>>>>>>>>>> queryId    BIGINT,
>>>>>>>>>>>   itemId    STRING,
>>>>>>>>>>>   quantity INT
>>>>>>>>>>> ) WITH (
>>>>>>>>>>> 'connector.type'     = 'kafka',
>>>>>>>>>>> 'connector.version' = 'universal',
>>>>>>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>>>>>>> 'format.type' = 'json'
>>>>>>>>>>> );
>>>>>>>>>>>
>>>>>>>>>>> INSERT INTO QueryResult
>>>>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>>>>>>> FROM
>>>>>>>>>>>   ItemTransactions AS t,
>>>>>>>>>>>   Queries AS q
>>>>>>>>>>> WHERE
>>>>>>>>>>>   t.itemId = q.itemId AND
>>>>>>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>>>>>>> q.event_time
>>>>>>>>>>> GROUP BY
>>>>>>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>>>>>>
>>>>>>>>>>> And the error I get is:
>>>>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException:
>>>>>>>>>>> Invalid SQL update statement.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>
>>>>>>>>>>> I am wondering what could I do to just simply pump the result
>>>>>>>>>>> updates to Kafka here.
>>>>>>>>>>>
>>>>>>>>>>> Gyula
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>>>>>>> khachatryan.ro...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Gyula,
>>>>>>>>>>>>
>>>>>>>>>>>> Could you provide the code of your Flink program, the error
>>>>>>>>>>>> with stacktrace and the Flink version?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks.,
>>>>>>>>>>>> Roman
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL
>>>>>>>>>>>>> API and I am trying to play around with it implementing and 
>>>>>>>>>>>>> running a few
>>>>>>>>>>>>> use-cases.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a simple window join + aggregation, grouped on some id
>>>>>>>>>>>>> that I want to write to Kafka but I am hitting the following 
>>>>>>>>>>>>> error:
>>>>>>>>>>>>>
>>>>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert
>>>>>>>>>>>>> changes."
>>>>>>>>>>>>>
>>>>>>>>>>>>> If I understand correctly the problem here is that since
>>>>>>>>>>>>> updates are possible within a single group, we have a retract 
>>>>>>>>>>>>> stream and
>>>>>>>>>>>>> the Kafka Sink cannot handle that. I tried to search for the 
>>>>>>>>>>>>> solution but I
>>>>>>>>>>>>> haven't found any satisfying answers.
>>>>>>>>>>>>>
>>>>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous
>>>>>>>>>>>>> values and just always keep sending the latest (like you would 
>>>>>>>>>>>>> see it on
>>>>>>>>>>>>> the CLI output).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you!
>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Benoît Paris
>>>>>>>>>> Ingénieur Machine Learning Explicable
>>>>>>>>>> Tél : +33 6 60 74 23 00
>>>>>>>>>> http://benoit.paris
>>>>>>>>>> http://explicable.ml
>>>>>>>>>>
>>>>>>>>>

Reply via email to