Hi Rex, indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin. Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.
However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka). 1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained. 2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained. 3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key) 4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition. Note that windows are a kind of aggregation. So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology. --- Please note that there is a common pitfall when you work with Kafka: - Ordering of records in Kafka is only guaranteed if you set *max.in.flight.requests.per.connection *to 1 *. [1]* *- *Often you also want to set *enable.idempotence* and *acks=all* That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink. [1] https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com> wrote: > Hello, > > I began reading > https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows > > - > > *> Redistributing* streams (between *map()* and *keyBy/window*, as > well as between *keyBy/window* and *sink*) change the partitioning of > streams. Each *operator subtask* sends data to different target > subtasks, depending on the selected transformation. Examples are > *keyBy()* (re-partitions by hash code), *broadcast()*, or *rebalance()* > (random redistribution). In a *redistributing* exchange, order among > elements is only preserved for each pair of sending- and receiving task > (for example subtask[1] of *map()* and subtask[2] of *keyBy/window*). > > This makes it sounds like ordering on the same partition/key is always > maintained. Which is exactly the ordering guarantee that I need. This seems > to slightly contradict the statement "Flink provides no guarantees about > the order of the elements within a window" for keyed state. So is it true > that ordering _is_ guaranteed for identical keys? > > If I'm not mistaken, the state in the TableAPI is always considered keyed > state for a join or aggregate. Or am I missing something? > > Thanks! > > On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com> wrote: > >> Our data arrives in order from Kafka, so we are hoping to use that same >> order for our processing. >> >> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com> wrote: >> >>> Going further, if "Flink provides no guarantees about the order of the >>> elements within a window" then with minibatch, which I assume uses a window >>> under the hood, any aggregates that expect rows to arrive in order will >>> fail to keep their consistency. Is this correct? >>> >>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> Hello, >>>> >>>> We have a job from CDC to a large unbounded Flink plan to Elasticsearch. >>>> >>>> Currently, we have been relentlessly trying to reduce our record >>>> amplification which, when our Elasticsearch index is near fully populated, >>>> completely bottlenecks our write performance. We decided recently to try a >>>> new job using mini-batch. At first this seemed promising but at some point >>>> we began getting huge record amplification in a join operator. It appears >>>> that minibatch may only batch on aggregate operators? >>>> >>>> So we're now thinking that we should have a window before our ES sink >>>> which only takes the last record for any unique document id in the window, >>>> since that's all we really want to send anyway. However, when investigating >>>> turning a table, to a keyed window stream for deduping, and then back into >>>> a table I read the following: >>>> >>>> >Attention Flink provides no guarantees about the order of the >>>> elements within a window. This implies that although an evictor may remove >>>> elements from the beginning of the window, these are not necessarily the >>>> ones that arrive first or last. [1] >>>> >>>> which has put a damper on our investigation. >>>> >>>> I then found the deduplication SQL doc [2], but I have a hard time >>>> parsing what the SQL does and we've never used TemporaryViews or proctime >>>> before. >>>> Is this essentially what we want? >>>> Will just using this SQL be safe for a job that is unbounded and just >>>> wants to deduplicate a document write to whatever the most current one is >>>> (i.e. will restoring from a checkpoint maintain our unbounded consistency >>>> and will deletes work)? >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication >>>> >>>> Thanks! >>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >