Re: Writing retract streams to Kafka

2020-03-06 Thread Gyula Fóra
Thanks Kurt, I came to the same conclusions after trying what Jark provided. I can get similar behaviour if I reduce the grouping window to 1 sec but still keep the join window large. Gyula On Fri, Mar 6, 2020 at 3:09 PM Kurt Young wrote: > @Gyula Fóra I think your query is right, we should >

Re: Writing retract streams to Kafka

2020-03-06 Thread Kurt Young
@Gyula Fóra I think your query is right, we should produce insert only results if you have event time and watermark defined. I've create https://issues.apache.org/jira/browse/FLINK-16466 to track this issue. Best, Kurt On Fri, Mar 6, 2020 at 12:14 PM Kurt Young wrote: > Actually this use case

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
Actually this use case lead me to start thinking about one question: If watermark is enabled, could we also support GROUP BY event_time instead of forcing user defining a window based on the event_time. GROUP BY a standalone event_time can also be treated as a special window, which has both start_

Re: Writing retract streams to Kafka

2020-03-05 Thread Jark Wu
Hi Gyula, Does tumbling 5 seconds for aggregation meet your need? For example: INSERT INTO QueryResult SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5' SECOND), sum(t.quantity) AS quantity FROM ItemTransactions AS t, Queries AS q WHERE t.itemId = q.itemId AND t.event_ti

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
I see, maybe I just dont understand how to properly express what I am trying to compute. Basically I want to aggregate the quantities of the transactions that happened in the 5 seconds before the query. Every query.id belongs to a single query (event_time, itemid) but still I have to group :/ Gyu

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
I think the issue is not caused by event time interval join, but the aggregation after the join: GROUP BY t.itemId, q.event_time, q.queryId; In this case, there is still no chance for Flink to determine whether the groups like (itemId, eventtime, queryId) have complete data or not. As a compar

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
That's exactly the kind of behaviour I am looking for Kurt ("ignore all delete messages"). As for the data completion, in my above example it is basically an event time interval join. With watermarks defined Flink should be able to compute results once in exactly the same way as for the tumbling w

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
Back to this case, I assume you are expecting something like "ignore all delete messages" flag? With this flag turned on, Flink will only send insert messages which corresponding current correct results to kafka and drop all retractions and deletes on the fly. Best, Kurt On Thu, Mar 5, 2020 at 1

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
> I also don't completely understand at this point why I can write the result of a group, tumble window aggregate to Kafka and not this window join / aggregate. If you are doing a tumble window aggregate with watermark enabled, Flink will only fire a final result for each window at once, no modifi

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Thanks Benoît! I can see now how I can implement this myself through the provided sink interfaces but I was trying to avoid having to write code for this :D My initial motivation was to see whether we are able to write out any kind of table to Kafka as a simple stream of "upserts". I also don't c

Re: Writing retract streams to Kafka

2020-03-05 Thread Benoît Paris
Hi Gyula, I'm afraid conversion to see the retractions vs inserts can't be done in pure SQL (though I'd love that feature). You might want to go lower level and implement a RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink [3]. This will give you a emitDataStream(DataStrea

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Hi Roman, This is the core logic: CREATE TABLE QueryResult ( queryIdBIGINT, itemIdSTRING, quantity INT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'query.output.log.1', 'connector.properties.bootstrap.servers' = '', 'format.type' =

Re: Writing retract streams to Kafka

2020-03-05 Thread Khachatryan Roman
Hi Gyula, Could you provide the code of your Flink program, the error with stacktrace and the Flink version? Thanks., Roman On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra wrote: > Hi All! > > Excuse my stupid question, I am pretty new to the Table/SQL API and I am > trying to play around with it i

Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Hi All! Excuse my stupid question, I am pretty new to the Table/SQL API and I am trying to play around with it implementing and running a few use-cases. I have a simple window join + aggregation, grouped on some id that I want to write to Kafka but I am hitting the following error: "AppendStream