Hi Rex, there cannot be any late event in processing time by definition (maybe on a quantum computer?), so you should be fine. The timestamp of records in processing time is monotonously increasing.
Best, Arvid On Fri, Jan 29, 2021 at 1:14 AM Rex Fenley <r...@remind101.com> wrote: > Switching to TumblingProcessingTimeWindows seems to have solved that > problem. > > For my own understanding, this won't have any "late" and therefore dropped > records right? We cannot blindly drop a record from the aggregate > evaluation, it just needs to take all the records it gets in a window and > process them and then the aggregate will take whatever is last in-order. > > Thanks! > > On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley <r...@remind101.com> wrote: > >> It looks like it wants me to call assignTimestampsAndWatermarks but I >> already have a timer on my window which I'd expect everything entering this >> stream would simply be aggregated during that window >> .window(TumblingEventTimeWindows.of(Time.seconds(1))) >> >> On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <r...@remind101.com> wrote: >> >>> I think I may have been affected by some late night programming. >>> >>> Slightly revised how I'm using my aggregate >>> val userDocsStream = >>> this.tableEnv >>> .toRetractStream(userDocsTable, classOf[Row]) >>> .keyBy(_.f1.getField(0)) >>> val compactedUserDocsStream = userDocsStream >>> .window(TumblingEventTimeWindows.of(Time.seconds(1))) >>> .aggregate(new CompactionAggregate()) >>> but this now gives me the following exception: >>> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no >>> timestamp marker). Is the time characteristic set to 'ProcessingTime', >>> or did you forget to call >>> 'DataStream.assignTimestampsAndWatermarks(...)'? >>> at org.apache.flink.streaming.api.windowing.assigners. >>> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69) >>> at org.apache.flink.streaming.runtime.operators.windowing. >>> WindowOperator.processElement(WindowOperator.java:295) >>> at org.apache.flink.streaming.runtime.tasks. >>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask >>> .java:161) >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .processElement(StreamTaskNetworkInput.java:178) >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .emitNext(StreamTaskNetworkInput.java:153) >>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>> .processInput(StreamOneInputProcessor.java:67) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >>> StreamTask.java:351) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .runMailboxStep(MailboxProcessor.java:191) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .runMailboxLoop(MailboxProcessor.java:181) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .runMailboxLoop(StreamTask.java:566) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:536) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>> at java.base/java.lang.Thread.run(Thread.java:829) >>> >>> Which I'm not at all sure how to interpret >>> >>> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> Ok, that sounds like it confirms my expectations. >>>> >>>> So I tried running my above code and had to slightly edit to using java >>>> Tuple2 because our execution environment stuff is all in Java. >>>> >>>> class CompactionAggregate >>>> extends AggregateFunction[ >>>> Tuple2[java.lang.Boolean, Row], >>>> Tuple2[java.lang.Boolean, Row], >>>> Tuple2[java.lang.Boolean, Row] >>>> ] { >>>> >>>> override def createAccumulator() = new Tuple2(false, null) >>>> >>>> // Just take the lastest value to compact. >>>> override def add( >>>> value: Tuple2[java.lang.Boolean, Row], >>>> accumulator: Tuple2[java.lang.Boolean, Row] >>>> ) = >>>> value >>>> >>>> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) = >>>> accumulator >>>> >>>> // This is a required function that we don't use. >>>> override def merge( >>>> a: Tuple2[java.lang.Boolean, Row], >>>> b: Tuple2[java.lang.Boolean, Row] >>>> ) = >>>> throw new NotImplementedException() >>>> } >>>> >>>> But when running I get the following error: >>>> >Caused by: java.lang.RuntimeException: Could not extract key from >>>> [redacted row] >>>> >... >>>> > Caused by: org.apache.flink.table.api.ValidationException: >>>> Unsupported kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' >>>> kind are supported when converting to an expression. >>>> >>>> I'm googling around and haven't found anything informative about what >>>> might be causing this issue. Any ideas? >>>> >>>> I'll also take a look at the SQL functions you suggested and see if I >>>> can use those. >>>> >>>> Thanks! >>>> >>>> >>>> >>>> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <ar...@apache.org> wrote: >>>> >>>>> Hi Rex, >>>>> >>>>> if your keyby (and with join/grouping/windowing) is random or not >>>>> depends on the relationship of the join/grouping key with your Kafka >>>>> partitioning key. >>>>> >>>>> Say your partitioning key is document_id. Then, any join/grouping key >>>>> that is composed of (or is exactly) document_id, will retain the order. >>>>> You >>>>> should always ask yourself the question: can two records coming from the >>>>> ordered Kafka partition X be processed by two different operator >>>>> instances. >>>>> For a join/grouping operator, there is only the strict guarantee that all >>>>> records with the same key will be shuffled into the same operator >>>>> instance. >>>>> >>>>> Your compaction in general looks good but I'm not deep into Table API. >>>>> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table >>>>> API should already do what you want. [1] >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions >>>>> >>>>> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <r...@remind101.com> wrote: >>>>> >>>>>> In addition to those questions, assuming that keyed streams are in >>>>>> order, I've come up with the following solution to compact our records >>>>>> and >>>>>> only pick the most recent one per id before sending to the ES sink. >>>>>> >>>>>> The first item in the Row is the document ID / primary key which we >>>>>> want to compact records on. >>>>>> >>>>>> val userDocsStream = >>>>>> userDocsTable.toRetractStream[Row].keyBy(_._2.get(0)) >>>>>> userDocsStream >>>>>> .window(TumblingEventTimeWindows.of(Time.seconds(1))) >>>>>> .aggregate(new CompactionAggregate())class CompactionAggregate >>>>>> extends AggregateFunction[ >>>>>> (Boolean, Row), >>>>>> (Boolean, Row), >>>>>> (Boolean, Row) >>>>>> ] { override def createAccumulator() = (false, null) // Just take >>>>>> the latest value to compact. >>>>>> override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) = >>>>>> value override def getResult(accumulator: (Boolean, Row)) = >>>>>> accumulator // This is a required function that we don't use. >>>>>> override def merge(a: (Boolean, Row), b: (Boolean, Row)) = >>>>>> throw new NotImplementedException() >>>>>> } >>>>>> >>>>>> I'm hoping that if the last record in the window is an insert it >>>>>> picks that if it's a retract then it picks that and then when we send >>>>>> this >>>>>> to the ES sink we will simply check true or false in the first element of >>>>>> the tuple for an insert or delete request to ES. Does this seem like it >>>>>> will work? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <r...@remind101.com> >>>>>> wrote: >>>>>> >>>>>>> This is great info, thanks! >>>>>>> >>>>>>> My question then becomes, what constitutes a random shuffle? >>>>>>> Currently we're using the Table API with minibatch on flink v1.11.3. Do >>>>>>> our >>>>>>> joins output a keyed stream of records by join key or is this random? I >>>>>>> imagine that they'd have to have a key for retracts and accumulates to >>>>>>> arrive in order on the next downstream operator. Same with aggs but on >>>>>>> the >>>>>>> groupBy key. >>>>>>> >>>>>>> Does this sound correct to you? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <ar...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Rex, >>>>>>>> >>>>>>>> indeed these two statements look like they contradict each other, >>>>>>>> but they are looking at both sides from the same coin. >>>>>>>> Flink is simply putting records in FIFO in windows. That is, there >>>>>>>> is no ordering on event time if there are late events. So if your >>>>>>>> elements >>>>>>>> arrive ordered, the ordering is retained. If your elements arrive >>>>>>>> unordered, the same unordered order is retained. >>>>>>>> >>>>>>>> However, note that Flink can only guarantee FIFO according to your >>>>>>>> topology. Consider a source with parallelism 2, each reading data from >>>>>>>> an >>>>>>>> ordered kafka partition (k1, k2) respectively. Each partition has >>>>>>>> records >>>>>>>> with keys, such that no key appears in both partitions (default >>>>>>>> behavior if >>>>>>>> you set keys but no partition while writing to Kafka). >>>>>>>> 1) Let's assume you do a simple transformation and write them back >>>>>>>> into kafka with the same key. Then you can be sure that the order of >>>>>>>> the >>>>>>>> records is retained. >>>>>>>> >>>>>>>> 2) Now you add a random shuffle and have the transformation. Now >>>>>>>> two successive records may be processed in parallel and there is a race >>>>>>>> condition who is written first into Kafka. So order is not retained. >>>>>>>> >>>>>>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some >>>>>>>> aggregation. Two successive records with the same key will always be >>>>>>>> processed by the same aggregation operator. So the order is retained >>>>>>>> for >>>>>>>> each key (note that this is what you usually want and want Kafka gives >>>>>>>> you >>>>>>>> if you don't set the partition explicitly and just provide a key) >>>>>>>> >>>>>>>> 4) You shuffle both partitions by a different key. Then two >>>>>>>> successive Kafka records could be again calculated in parallel such >>>>>>>> that >>>>>>>> there is a race condition. >>>>>>>> >>>>>>>> Note that windows are a kind of aggregation. >>>>>>>> >>>>>>>> So Flink is never going to restore an ordering that is not there >>>>>>>> (because it's too costly and there are too many unknowns). But you can >>>>>>>> infer the guarantees by analyzing your topology. >>>>>>>> >>>>>>>> --- >>>>>>>> >>>>>>>> Please note that there is a common pitfall when you work with Kafka: >>>>>>>> - Ordering of records in Kafka is only guaranteed if you set >>>>>>>> *max.in.flight.requests.per.connection >>>>>>>> *to 1 >>>>>>>> *. [1]* >>>>>>>> *- *Often you also want to set *enable.idempotence* and *acks=all* >>>>>>>> >>>>>>>> That is true for the upstream application and if you plan back to >>>>>>>> write to Kafka you also need to set that in Flink. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html >>>>>>>> >>>>>>>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> I began reading >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows >>>>>>>>> >>>>>>>>> - >>>>>>>>> >>>>>>>>> *> Redistributing* streams (between *map()* and *keyBy/window*, >>>>>>>>> as well as between *keyBy/window* and *sink*) change the >>>>>>>>> partitioning of streams. Each *operator subtask* sends data to >>>>>>>>> different target subtasks, depending on the selected >>>>>>>>> transformation. >>>>>>>>> Examples are *keyBy()* (re-partitions by hash code), >>>>>>>>> *broadcast()*, or *rebalance()* (random redistribution). In a >>>>>>>>> *redistributing* exchange, order among elements is only >>>>>>>>> preserved for each pair of sending- and receiving task (for example >>>>>>>>> subtask[1] of *map()* and subtask[2] of *keyBy/window*). >>>>>>>>> >>>>>>>>> This makes it sounds like ordering on the same partition/key is >>>>>>>>> always maintained. Which is exactly the ordering guarantee that I >>>>>>>>> need. >>>>>>>>> This seems to slightly contradict the statement "Flink provides no >>>>>>>>> guarantees about the order of the elements within a window" for keyed >>>>>>>>> state. So is it true that ordering _is_ guaranteed for identical keys? >>>>>>>>> >>>>>>>>> If I'm not mistaken, the state in the TableAPI is always >>>>>>>>> considered keyed state for a join or aggregate. Or am I missing >>>>>>>>> something? >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Our data arrives in order from Kafka, so we are hoping to use >>>>>>>>>> that same order for our processing. >>>>>>>>>> >>>>>>>>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Going further, if "Flink provides no guarantees about the order >>>>>>>>>>> of the elements within a window" then with minibatch, which I >>>>>>>>>>> assume uses a >>>>>>>>>>> window under the hood, any aggregates that expect rows to arrive in >>>>>>>>>>> order >>>>>>>>>>> will fail to keep their consistency. Is this correct? >>>>>>>>>>> >>>>>>>>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello, >>>>>>>>>>>> >>>>>>>>>>>> We have a job from CDC to a large unbounded Flink plan to >>>>>>>>>>>> Elasticsearch. >>>>>>>>>>>> >>>>>>>>>>>> Currently, we have been relentlessly trying to reduce our >>>>>>>>>>>> record amplification which, when our Elasticsearch index is near >>>>>>>>>>>> fully >>>>>>>>>>>> populated, completely bottlenecks our write performance. We decided >>>>>>>>>>>> recently to try a new job using mini-batch. At first this seemed >>>>>>>>>>>> promising >>>>>>>>>>>> but at some point we began getting huge record amplification in a >>>>>>>>>>>> join >>>>>>>>>>>> operator. It appears that minibatch may only batch on aggregate >>>>>>>>>>>> operators? >>>>>>>>>>>> >>>>>>>>>>>> So we're now thinking that we should have a window before our >>>>>>>>>>>> ES sink which only takes the last record for any unique document >>>>>>>>>>>> id in the >>>>>>>>>>>> window, since that's all we really want to send anyway. However, >>>>>>>>>>>> when >>>>>>>>>>>> investigating turning a table, to a keyed window stream for >>>>>>>>>>>> deduping, and >>>>>>>>>>>> then back into a table I read the following: >>>>>>>>>>>> >>>>>>>>>>>> >Attention Flink provides no guarantees about the order of the >>>>>>>>>>>> elements within a window. This implies that although an evictor >>>>>>>>>>>> may remove >>>>>>>>>>>> elements from the beginning of the window, these are not >>>>>>>>>>>> necessarily the >>>>>>>>>>>> ones that arrive first or last. [1] >>>>>>>>>>>> >>>>>>>>>>>> which has put a damper on our investigation. >>>>>>>>>>>> >>>>>>>>>>>> I then found the deduplication SQL doc [2], but I have a hard >>>>>>>>>>>> time parsing what the SQL does and we've never used TemporaryViews >>>>>>>>>>>> or >>>>>>>>>>>> proctime before. >>>>>>>>>>>> Is this essentially what we want? >>>>>>>>>>>> Will just using this SQL be safe for a job that is unbounded >>>>>>>>>>>> and just wants to deduplicate a document write to whatever the >>>>>>>>>>>> most current >>>>>>>>>>>> one is (i.e. will restoring from a checkpoint maintain our >>>>>>>>>>>> unbounded >>>>>>>>>>>> consistency and will deletes work)? >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html >>>>>>>>>>>> [2] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication >>>>>>>>>>>> >>>>>>>>>>>> Thanks! >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>> >>>>>>>>> >>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>> >>>>>>> >>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>> <https://www.facebook.com/remindhq> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>> >>>>>> >>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>> <https://www.facebook.com/remindhq> >>>>>> >>>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >