Re: Deduplicating record amplification

2021-01-29 Thread Rex Fenley
Great, thank you for the confirmation! On Thu, Jan 28, 2021 at 11:25 PM Arvid Heise wrote: > Hi Rex, > > there cannot be any late event in processing time by definition (maybe on > a quantum computer?), so you should be fine. The timestamp of records in > processing time is monotonously increasi

Re: Deduplicating record amplification

2021-01-28 Thread Arvid Heise
Hi Rex, there cannot be any late event in processing time by definition (maybe on a quantum computer?), so you should be fine. The timestamp of records in processing time is monotonously increasing. Best, Arvid On Fri, Jan 29, 2021 at 1:14 AM Rex Fenley wrote: > Switching to TumblingProcessin

Re: Deduplicating record amplification

2021-01-28 Thread Rex Fenley
Switching to TumblingProcessingTimeWindows seems to have solved that problem. For my own understanding, this won't have any "late" and therefore dropped records right? We cannot blindly drop a record from the aggregate evaluation, it just needs to take all the records it gets in a window and proce

Re: Deduplicating record amplification

2021-01-28 Thread Rex Fenley
It looks like it wants me to call assignTimestampsAndWatermarks but I already have a timer on my window which I'd expect everything entering this stream would simply be aggregated during that window .window(TumblingEventTimeWindows.of(Time.seconds(1))) On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley

Re: Deduplicating record amplification

2021-01-28 Thread Rex Fenley
I think I may have been affected by some late night programming. Slightly revised how I'm using my aggregate val userDocsStream = this.tableEnv .toRetractStream(userDocsTable, classOf[Row]) .keyBy(_.f1.getField(0)) val compactedUserDocsStream = userDocsStream .window(TumblingEventTimeWindows.of(Ti

Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
Ok, that sounds like it confirms my expectations. So I tried running my above code and had to slightly edit to using java Tuple2 because our execution environment stuff is all in Java. class CompactionAggregate extends AggregateFunction[ Tuple2[java.lang.Boolean, Row], Tuple2[java.lang.Boolean, R

Re: Deduplicating record amplification

2021-01-27 Thread Arvid Heise
Hi Rex, if your keyby (and with join/grouping/windowing) is random or not depends on the relationship of the join/grouping key with your Kafka partitioning key. Say your partitioning key is document_id. Then, any join/grouping key that is composed of (or is exactly) document_id, will retain the o

Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink. The first item in the Row is the document ID / primary key which we want to compact reco

Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
This is great info, thanks! My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accum

Re: Deduplicating record amplification

2021-01-27 Thread Arvid Heise
Hi Rex, indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin. Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the orderin

Re: Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Hello, I began reading https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows - *> Redistributing* streams (between *map()* and *keyBy/window*, as well as between *keyBy/window* and *sink*) change the partitioning of streams. Each *operator subtask*

Re: Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Our data arrives in order from Kafka, so we are hoping to use that same order for our processing. On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley wrote: > Going further, if "Flink provides no guarantees about the order of the > elements within a window" then with minibatch, which I assume uses a wind

Re: Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct? On Tue, Jan 26, 2021 at 5:36 P