@Gyula Fóra <gyula.f...@gmail.com> I think your query is right, we should
produce insert only results if you have event time and watermark defined.
I've create https://issues.apache.org/jira/browse/FLINK-16466 to track this
issue.

Best,
Kurt


On Fri, Mar 6, 2020 at 12:14 PM Kurt Young <ykt...@gmail.com> wrote:

> Actually this use case lead me to start thinking about one question:
> If watermark is enabled, could we also support GROUP BY event_time instead
> of forcing
> user defining a window based on the event_time.
>
> GROUP BY a standalone event_time can also be treated as a special window,
> which has
> both start_time and end_time equals to event_time. And when watermark
> surpass the event_time,
> we can still get the complete data of such group and do required
> aggregation and then emit
> insert only results.
>
> That would ease user's burden for not having to define a window when they
> already have event
> time and watermark defined.
>
> Best,
> Kurt
>
>
> On Fri, Mar 6, 2020 at 10:26 AM Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> Does tumbling 5 seconds for aggregation meet your need? For example:
>>
>> INSERT INTO QueryResult
>> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
>> SECOND), sum(t.quantity) AS quantity
>> FROM
>>   ItemTransactions AS t,
>>   Queries AS q
>> WHERE
>>   t.itemId = q.itemId AND
>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
>> GROUP BY
>>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>>
>> Best,
>> Jark
>>
>> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>>> I see, maybe I just dont understand how to properly express what I am
>>> trying to compute.
>>>
>>> Basically I want to aggregate the quantities of the transactions that
>>> happened in the 5 seconds before the query.
>>> Every query.id belongs to a single query (event_time, itemid) but still
>>> I have to group :/
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <ykt...@gmail.com> wrote:
>>>
>>>> I think the issue is not caused by event time interval join, but the
>>>> aggregation after the join:
>>>>     GROUP BY t.itemId, q.event_time, q.queryId;
>>>>
>>>> In this case, there is still no chance for Flink to determine whether
>>>> the groups like (itemId, eventtime, queryId) have complete data or not.
>>>> As a comparison, if you change the grouping key to a window which based
>>>> only on q.event_time, then the query would emit insert only results.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com>
>>>> wrote:
>>>>
>>>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore
>>>>> all delete messages").
>>>>>
>>>>> As for the data completion, in my above example it is basically an
>>>>> event time interval join.
>>>>> With watermarks defined Flink should be able to compute results once
>>>>> in exactly the same way as for the tumbling window.
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>>
>>>>>> Back to this case, I assume you are expecting something like "ignore
>>>>>> all delete messages" flag? With this
>>>>>> flag turned on, Flink will only send insert messages which
>>>>>> corresponding current correct results to kafka and
>>>>>> drop all retractions and deletes on the fly.
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>>>
>>>>>>> > I also don't completely understand at this point why I can write
>>>>>>> the result of a group, tumble window aggregate to Kafka and not this 
>>>>>>> window
>>>>>>> join / aggregate.
>>>>>>>
>>>>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>>>>> Flink will only fire a final result for
>>>>>>> each window at once, no modification or retractions will happen
>>>>>>> after a window is calculated and fired.
>>>>>>> But with some other arbitrary aggregations, there is not enough
>>>>>>> information for Flink to determine whether
>>>>>>> the data is complete or not, so the framework will keep calculating
>>>>>>> results when receiving new records and
>>>>>>> retract earlier results by firing retraction/deletion messages.
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Benoît!
>>>>>>>>
>>>>>>>> I can see now how I can implement this myself through the provided
>>>>>>>> sink interfaces but I was trying to avoid having to write code for 
>>>>>>>> this :D
>>>>>>>> My initial motivation was to see whether we are able to write out
>>>>>>>> any kind of table to Kafka as a simple stream of "upserts".
>>>>>>>>
>>>>>>>> I also don't completely understand at this point why I can write
>>>>>>>> the result of a group, tumble window aggregate to Kafka and not this 
>>>>>>>> window
>>>>>>>> join / aggregate.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>>>>>> benoit.pa...@centraliens-lille.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Gyula,
>>>>>>>>>
>>>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be
>>>>>>>>> done in pure SQL (though I'd love that feature).
>>>>>>>>>
>>>>>>>>> You might want to go lower level and implement a
>>>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a 
>>>>>>>>> KafkaTableSink
>>>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, 
>>>>>>>>> T>>
>>>>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' 
>>>>>>>>> or
>>>>>>>>> 'retract' signal.
>>>>>>>>> You can then filter the DataStream accordingly before passing to
>>>>>>>>> the KafkaTableSink.
>>>>>>>>>
>>>>>>>>> Hope this helps.
>>>>>>>>>
>>>>>>>>> Best regards
>>>>>>>>> Benoît
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>>>>>> [3]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Roman,
>>>>>>>>>>
>>>>>>>>>> This is the core logic:
>>>>>>>>>>
>>>>>>>>>> CREATE TABLE QueryResult (
>>>>>>>>>> queryId    BIGINT,
>>>>>>>>>>   itemId    STRING,
>>>>>>>>>>   quantity INT
>>>>>>>>>> ) WITH (
>>>>>>>>>> 'connector.type'     = 'kafka',
>>>>>>>>>> 'connector.version' = 'universal',
>>>>>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>>>>>> 'format.type' = 'json'
>>>>>>>>>> );
>>>>>>>>>>
>>>>>>>>>> INSERT INTO QueryResult
>>>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>>>>>> FROM
>>>>>>>>>>   ItemTransactions AS t,
>>>>>>>>>>   Queries AS q
>>>>>>>>>> WHERE
>>>>>>>>>>   t.itemId = q.itemId AND
>>>>>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>>>>>> q.event_time
>>>>>>>>>> GROUP BY
>>>>>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>>>>>
>>>>>>>>>> And the error I get is:
>>>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException:
>>>>>>>>>> Invalid SQL update statement.
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>
>>>>>>>>>> I am wondering what could I do to just simply pump the result
>>>>>>>>>> updates to Kafka here.
>>>>>>>>>>
>>>>>>>>>> Gyula
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>>>>>> khachatryan.ro...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Gyula,
>>>>>>>>>>>
>>>>>>>>>>> Could you provide the code of your Flink program, the error with
>>>>>>>>>>> stacktrace and the Flink version?
>>>>>>>>>>>
>>>>>>>>>>> Thanks.,
>>>>>>>>>>> Roman
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All!
>>>>>>>>>>>>
>>>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API
>>>>>>>>>>>> and I am trying to play around with it implementing and running a 
>>>>>>>>>>>> few
>>>>>>>>>>>> use-cases.
>>>>>>>>>>>>
>>>>>>>>>>>> I have a simple window join + aggregation, grouped on some id
>>>>>>>>>>>> that I want to write to Kafka but I am hitting the following error:
>>>>>>>>>>>>
>>>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert
>>>>>>>>>>>> changes."
>>>>>>>>>>>>
>>>>>>>>>>>> If I understand correctly the problem here is that since
>>>>>>>>>>>> updates are possible within a single group, we have a retract 
>>>>>>>>>>>> stream and
>>>>>>>>>>>> the Kafka Sink cannot handle that. I tried to search for the 
>>>>>>>>>>>> solution but I
>>>>>>>>>>>> haven't found any satisfying answers.
>>>>>>>>>>>>
>>>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous
>>>>>>>>>>>> values and just always keep sending the latest (like you would see 
>>>>>>>>>>>> it on
>>>>>>>>>>>> the CLI output).
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you!
>>>>>>>>>>>> Gyula
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Benoît Paris
>>>>>>>>> Ingénieur Machine Learning Explicable
>>>>>>>>> Tél : +33 6 60 74 23 00
>>>>>>>>> http://benoit.paris
>>>>>>>>> http://explicable.ml
>>>>>>>>>
>>>>>>>>

Reply via email to