In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink.
The first item in the Row is the document ID / primary key which we want to compact records on. val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0)) userDocsStream .window(TumblingEventTimeWindows.of(Time.seconds(1))) .aggregate(new CompactionAggregate())class CompactionAggregate extends AggregateFunction[ (Boolean, Row), (Boolean, Row), (Boolean, Row) ] { override def createAccumulator() = (false, null) // Just take the latest value to compact. override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) = value override def getResult(accumulator: (Boolean, Row)) = accumulator // This is a required function that we don't use. override def merge(a: (Boolean, Row), b: (Boolean, Row)) = throw new NotImplementedException() } I'm hoping that if the last record in the window is an insert it picks that if it's a retract then it picks that and then when we send this to the ES sink we will simply check true or false in the first element of the tuple for an insert or delete request to ES. Does this seem like it will work? Thanks! On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <r...@remind101.com> wrote: > This is great info, thanks! > > My question then becomes, what constitutes a random shuffle? Currently > we're using the Table API with minibatch on flink v1.11.3. Do our joins > output a keyed stream of records by join key or is this random? I imagine > that they'd have to have a key for retracts and accumulates to arrive in > order on the next downstream operator. Same with aggs but on the groupBy > key. > > Does this sound correct to you? > > Thanks! > > On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi Rex, >> >> indeed these two statements look like they contradict each other, but >> they are looking at both sides from the same coin. >> Flink is simply putting records in FIFO in windows. That is, there is no >> ordering on event time if there are late events. So if your elements arrive >> ordered, the ordering is retained. If your elements arrive unordered, the >> same unordered order is retained. >> >> However, note that Flink can only guarantee FIFO according to your >> topology. Consider a source with parallelism 2, each reading data from an >> ordered kafka partition (k1, k2) respectively. Each partition has records >> with keys, such that no key appears in both partitions (default behavior if >> you set keys but no partition while writing to Kafka). >> 1) Let's assume you do a simple transformation and write them back into >> kafka with the same key. Then you can be sure that the order of the records >> is retained. >> >> 2) Now you add a random shuffle and have the transformation. Now two >> successive records may be processed in parallel and there is a race >> condition who is written first into Kafka. So order is not retained. >> >> 3) You shuffle both partitions by the Kafka key (keyby) and do some >> aggregation. Two successive records with the same key will always be >> processed by the same aggregation operator. So the order is retained for >> each key (note that this is what you usually want and want Kafka gives you >> if you don't set the partition explicitly and just provide a key) >> >> 4) You shuffle both partitions by a different key. Then two successive >> Kafka records could be again calculated in parallel such that there is a >> race condition. >> >> Note that windows are a kind of aggregation. >> >> So Flink is never going to restore an ordering that is not there (because >> it's too costly and there are too many unknowns). But you can infer the >> guarantees by analyzing your topology. >> >> --- >> >> Please note that there is a common pitfall when you work with Kafka: >> - Ordering of records in Kafka is only guaranteed if you set >> *max.in.flight.requests.per.connection >> *to 1 >> *. [1]* >> *- *Often you also want to set *enable.idempotence* and *acks=all* >> >> That is true for the upstream application and if you plan back to write >> to Kafka you also need to set that in Flink. >> >> [1] >> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html >> >> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com> wrote: >> >>> Hello, >>> >>> I began reading >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows >>> >>> - >>> >>> *> Redistributing* streams (between *map()* and *keyBy/window*, as >>> well as between *keyBy/window* and *sink*) change the partitioning >>> of streams. Each *operator subtask* sends data to different target >>> subtasks, depending on the selected transformation. Examples are >>> *keyBy()* (re-partitions by hash code), *broadcast()*, or >>> *rebalance()* (random redistribution). In a *redistributing* >>> exchange, order among elements is only preserved for each pair of >>> sending- >>> and receiving task (for example subtask[1] of *map()* and subtask[2] >>> of *keyBy/window*). >>> >>> This makes it sounds like ordering on the same partition/key is always >>> maintained. Which is exactly the ordering guarantee that I need. This seems >>> to slightly contradict the statement "Flink provides no guarantees about >>> the order of the elements within a window" for keyed state. So is it true >>> that ordering _is_ guaranteed for identical keys? >>> >>> If I'm not mistaken, the state in the TableAPI is always considered >>> keyed state for a join or aggregate. Or am I missing something? >>> >>> Thanks! >>> >>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> Our data arrives in order from Kafka, so we are hoping to use that same >>>> order for our processing. >>>> >>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> Going further, if "Flink provides no guarantees about the order of the >>>>> elements within a window" then with minibatch, which I assume uses a >>>>> window >>>>> under the hood, any aggregates that expect rows to arrive in order will >>>>> fail to keep their consistency. Is this correct? >>>>> >>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> We have a job from CDC to a large unbounded Flink plan to >>>>>> Elasticsearch. >>>>>> >>>>>> Currently, we have been relentlessly trying to reduce our record >>>>>> amplification which, when our Elasticsearch index is near fully >>>>>> populated, >>>>>> completely bottlenecks our write performance. We decided recently to try >>>>>> a >>>>>> new job using mini-batch. At first this seemed promising but at some >>>>>> point >>>>>> we began getting huge record amplification in a join operator. It appears >>>>>> that minibatch may only batch on aggregate operators? >>>>>> >>>>>> So we're now thinking that we should have a window before our ES sink >>>>>> which only takes the last record for any unique document id in the >>>>>> window, >>>>>> since that's all we really want to send anyway. However, when >>>>>> investigating >>>>>> turning a table, to a keyed window stream for deduping, and then back >>>>>> into >>>>>> a table I read the following: >>>>>> >>>>>> >Attention Flink provides no guarantees about the order of the >>>>>> elements within a window. This implies that although an evictor may >>>>>> remove >>>>>> elements from the beginning of the window, these are not necessarily the >>>>>> ones that arrive first or last. [1] >>>>>> >>>>>> which has put a damper on our investigation. >>>>>> >>>>>> I then found the deduplication SQL doc [2], but I have a hard time >>>>>> parsing what the SQL does and we've never used TemporaryViews or proctime >>>>>> before. >>>>>> Is this essentially what we want? >>>>>> Will just using this SQL be safe for a job that is unbounded and just >>>>>> wants to deduplicate a document write to whatever the most current one is >>>>>> (i.e. will restoring from a checkpoint maintain our unbounded consistency >>>>>> and will deletes work)? >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html >>>>>> [2] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>> >>>>>> >>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>> <https://www.facebook.com/remindhq> >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>