Switching to TumblingProcessingTimeWindows seems to have solved that problem.
For my own understanding, this won't have any "late" and therefore dropped records right? We cannot blindly drop a record from the aggregate evaluation, it just needs to take all the records it gets in a window and process them and then the aggregate will take whatever is last in-order. Thanks! On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley <r...@remind101.com> wrote: > It looks like it wants me to call assignTimestampsAndWatermarks but I > already have a timer on my window which I'd expect everything entering this > stream would simply be aggregated during that window > .window(TumblingEventTimeWindows.of(Time.seconds(1))) > > On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <r...@remind101.com> wrote: > >> I think I may have been affected by some late night programming. >> >> Slightly revised how I'm using my aggregate >> val userDocsStream = >> this.tableEnv >> .toRetractStream(userDocsTable, classOf[Row]) >> .keyBy(_.f1.getField(0)) >> val compactedUserDocsStream = userDocsStream >> .window(TumblingEventTimeWindows.of(Time.seconds(1))) >> .aggregate(new CompactionAggregate()) >> but this now gives me the following exception: >> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no >> timestamp marker). Is the time characteristic set to 'ProcessingTime', or >> did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? >> at org.apache.flink.streaming.api.windowing.assigners. >> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69) >> at org.apache.flink.streaming.runtime.operators.windowing. >> WindowOperator.processElement(WindowOperator.java:295) >> at org.apache.flink.streaming.runtime.tasks. >> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask >> .java:161) >> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >> .processElement(StreamTaskNetworkInput.java:178) >> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >> .emitNext(StreamTaskNetworkInput.java:153) >> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >> .processInput(StreamOneInputProcessor.java:67) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >> StreamTask.java:351) >> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >> .runMailboxStep(MailboxProcessor.java:191) >> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >> .runMailboxLoop(MailboxProcessor.java:181) >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .runMailboxLoop(StreamTask.java:566) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:536) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >> at java.base/java.lang.Thread.run(Thread.java:829) >> >> Which I'm not at all sure how to interpret >> >> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <r...@remind101.com> wrote: >> >>> Ok, that sounds like it confirms my expectations. >>> >>> So I tried running my above code and had to slightly edit to using java >>> Tuple2 because our execution environment stuff is all in Java. >>> >>> class CompactionAggregate >>> extends AggregateFunction[ >>> Tuple2[java.lang.Boolean, Row], >>> Tuple2[java.lang.Boolean, Row], >>> Tuple2[java.lang.Boolean, Row] >>> ] { >>> >>> override def createAccumulator() = new Tuple2(false, null) >>> >>> // Just take the lastest value to compact. >>> override def add( >>> value: Tuple2[java.lang.Boolean, Row], >>> accumulator: Tuple2[java.lang.Boolean, Row] >>> ) = >>> value >>> >>> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) = >>> accumulator >>> >>> // This is a required function that we don't use. >>> override def merge( >>> a: Tuple2[java.lang.Boolean, Row], >>> b: Tuple2[java.lang.Boolean, Row] >>> ) = >>> throw new NotImplementedException() >>> } >>> >>> But when running I get the following error: >>> >Caused by: java.lang.RuntimeException: Could not extract key from >>> [redacted row] >>> >... >>> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported >>> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are >>> supported when converting to an expression. >>> >>> I'm googling around and haven't found anything informative about what >>> might be causing this issue. Any ideas? >>> >>> I'll also take a look at the SQL functions you suggested and see if I >>> can use those. >>> >>> Thanks! >>> >>> >>> >>> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <ar...@apache.org> wrote: >>> >>>> Hi Rex, >>>> >>>> if your keyby (and with join/grouping/windowing) is random or not >>>> depends on the relationship of the join/grouping key with your Kafka >>>> partitioning key. >>>> >>>> Say your partitioning key is document_id. Then, any join/grouping key >>>> that is composed of (or is exactly) document_id, will retain the order. You >>>> should always ask yourself the question: can two records coming from the >>>> ordered Kafka partition X be processed by two different operator instances. >>>> For a join/grouping operator, there is only the strict guarantee that all >>>> records with the same key will be shuffled into the same operator instance. >>>> >>>> Your compaction in general looks good but I'm not deep into Table API. >>>> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table >>>> API should already do what you want. [1] >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions >>>> >>>> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> In addition to those questions, assuming that keyed streams are in >>>>> order, I've come up with the following solution to compact our records and >>>>> only pick the most recent one per id before sending to the ES sink. >>>>> >>>>> The first item in the Row is the document ID / primary key which we >>>>> want to compact records on. >>>>> >>>>> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0)) >>>>> userDocsStream >>>>> .window(TumblingEventTimeWindows.of(Time.seconds(1))) >>>>> .aggregate(new CompactionAggregate())class CompactionAggregate >>>>> extends AggregateFunction[ >>>>> (Boolean, Row), >>>>> (Boolean, Row), >>>>> (Boolean, Row) >>>>> ] { override def createAccumulator() = (false, null) // Just take >>>>> the latest value to compact. >>>>> override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) = >>>>> value override def getResult(accumulator: (Boolean, Row)) = >>>>> accumulator // This is a required function that we don't use. >>>>> override def merge(a: (Boolean, Row), b: (Boolean, Row)) = >>>>> throw new NotImplementedException() >>>>> } >>>>> >>>>> I'm hoping that if the last record in the window is an insert it picks >>>>> that if it's a retract then it picks that and then when we send this to >>>>> the >>>>> ES sink we will simply check true or false in the first element of the >>>>> tuple for an insert or delete request to ES. Does this seem like it will >>>>> work? >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <r...@remind101.com> wrote: >>>>> >>>>>> This is great info, thanks! >>>>>> >>>>>> My question then becomes, what constitutes a random shuffle? >>>>>> Currently we're using the Table API with minibatch on flink v1.11.3. Do >>>>>> our >>>>>> joins output a keyed stream of records by join key or is this random? I >>>>>> imagine that they'd have to have a key for retracts and accumulates to >>>>>> arrive in order on the next downstream operator. Same with aggs but on >>>>>> the >>>>>> groupBy key. >>>>>> >>>>>> Does this sound correct to you? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <ar...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Rex, >>>>>>> >>>>>>> indeed these two statements look like they contradict each other, >>>>>>> but they are looking at both sides from the same coin. >>>>>>> Flink is simply putting records in FIFO in windows. That is, there >>>>>>> is no ordering on event time if there are late events. So if your >>>>>>> elements >>>>>>> arrive ordered, the ordering is retained. If your elements arrive >>>>>>> unordered, the same unordered order is retained. >>>>>>> >>>>>>> However, note that Flink can only guarantee FIFO according to your >>>>>>> topology. Consider a source with parallelism 2, each reading data from >>>>>>> an >>>>>>> ordered kafka partition (k1, k2) respectively. Each partition has >>>>>>> records >>>>>>> with keys, such that no key appears in both partitions (default >>>>>>> behavior if >>>>>>> you set keys but no partition while writing to Kafka). >>>>>>> 1) Let's assume you do a simple transformation and write them back >>>>>>> into kafka with the same key. Then you can be sure that the order of the >>>>>>> records is retained. >>>>>>> >>>>>>> 2) Now you add a random shuffle and have the transformation. Now two >>>>>>> successive records may be processed in parallel and there is a race >>>>>>> condition who is written first into Kafka. So order is not retained. >>>>>>> >>>>>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some >>>>>>> aggregation. Two successive records with the same key will always be >>>>>>> processed by the same aggregation operator. So the order is retained for >>>>>>> each key (note that this is what you usually want and want Kafka gives >>>>>>> you >>>>>>> if you don't set the partition explicitly and just provide a key) >>>>>>> >>>>>>> 4) You shuffle both partitions by a different key. Then two >>>>>>> successive Kafka records could be again calculated in parallel such that >>>>>>> there is a race condition. >>>>>>> >>>>>>> Note that windows are a kind of aggregation. >>>>>>> >>>>>>> So Flink is never going to restore an ordering that is not there >>>>>>> (because it's too costly and there are too many unknowns). But you can >>>>>>> infer the guarantees by analyzing your topology. >>>>>>> >>>>>>> --- >>>>>>> >>>>>>> Please note that there is a common pitfall when you work with Kafka: >>>>>>> - Ordering of records in Kafka is only guaranteed if you set >>>>>>> *max.in.flight.requests.per.connection >>>>>>> *to 1 >>>>>>> *. [1]* >>>>>>> *- *Often you also want to set *enable.idempotence* and *acks=all* >>>>>>> >>>>>>> That is true for the upstream application and if you plan back to >>>>>>> write to Kafka you also need to set that in Flink. >>>>>>> >>>>>>> [1] >>>>>>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html >>>>>>> >>>>>>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello, >>>>>>>> >>>>>>>> I began reading >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows >>>>>>>> >>>>>>>> - >>>>>>>> >>>>>>>> *> Redistributing* streams (between *map()* and *keyBy/window*, >>>>>>>> as well as between *keyBy/window* and *sink*) change the >>>>>>>> partitioning of streams. Each *operator subtask* sends data to >>>>>>>> different target subtasks, depending on the selected transformation. >>>>>>>> Examples are *keyBy()* (re-partitions by hash code), >>>>>>>> *broadcast()*, or *rebalance()* (random redistribution). In a >>>>>>>> *redistributing* exchange, order among elements is only >>>>>>>> preserved for each pair of sending- and receiving task (for example >>>>>>>> subtask[1] of *map()* and subtask[2] of *keyBy/window*). >>>>>>>> >>>>>>>> This makes it sounds like ordering on the same partition/key is >>>>>>>> always maintained. Which is exactly the ordering guarantee that I need. >>>>>>>> This seems to slightly contradict the statement "Flink provides no >>>>>>>> guarantees about the order of the elements within a window" for keyed >>>>>>>> state. So is it true that ordering _is_ guaranteed for identical keys? >>>>>>>> >>>>>>>> If I'm not mistaken, the state in the TableAPI is always considered >>>>>>>> keyed state for a join or aggregate. Or am I missing something? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Our data arrives in order from Kafka, so we are hoping to use that >>>>>>>>> same order for our processing. >>>>>>>>> >>>>>>>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Going further, if "Flink provides no guarantees about the order >>>>>>>>>> of the elements within a window" then with minibatch, which I assume >>>>>>>>>> uses a >>>>>>>>>> window under the hood, any aggregates that expect rows to arrive in >>>>>>>>>> order >>>>>>>>>> will fail to keep their consistency. Is this correct? >>>>>>>>>> >>>>>>>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>>> We have a job from CDC to a large unbounded Flink plan to >>>>>>>>>>> Elasticsearch. >>>>>>>>>>> >>>>>>>>>>> Currently, we have been relentlessly trying to reduce our record >>>>>>>>>>> amplification which, when our Elasticsearch index is near fully >>>>>>>>>>> populated, >>>>>>>>>>> completely bottlenecks our write performance. We decided recently >>>>>>>>>>> to try a >>>>>>>>>>> new job using mini-batch. At first this seemed promising but at >>>>>>>>>>> some point >>>>>>>>>>> we began getting huge record amplification in a join operator. It >>>>>>>>>>> appears >>>>>>>>>>> that minibatch may only batch on aggregate operators? >>>>>>>>>>> >>>>>>>>>>> So we're now thinking that we should have a window before our ES >>>>>>>>>>> sink which only takes the last record for any unique document id in >>>>>>>>>>> the >>>>>>>>>>> window, since that's all we really want to send anyway. However, >>>>>>>>>>> when >>>>>>>>>>> investigating turning a table, to a keyed window stream for >>>>>>>>>>> deduping, and >>>>>>>>>>> then back into a table I read the following: >>>>>>>>>>> >>>>>>>>>>> >Attention Flink provides no guarantees about the order of the >>>>>>>>>>> elements within a window. This implies that although an evictor may >>>>>>>>>>> remove >>>>>>>>>>> elements from the beginning of the window, these are not >>>>>>>>>>> necessarily the >>>>>>>>>>> ones that arrive first or last. [1] >>>>>>>>>>> >>>>>>>>>>> which has put a damper on our investigation. >>>>>>>>>>> >>>>>>>>>>> I then found the deduplication SQL doc [2], but I have a hard >>>>>>>>>>> time parsing what the SQL does and we've never used TemporaryViews >>>>>>>>>>> or >>>>>>>>>>> proctime before. >>>>>>>>>>> Is this essentially what we want? >>>>>>>>>>> Will just using this SQL be safe for a job that is unbounded and >>>>>>>>>>> just wants to deduplicate a document write to whatever the most >>>>>>>>>>> current one >>>>>>>>>>> is (i.e. will restoring from a checkpoint maintain our unbounded >>>>>>>>>>> consistency and will deletes work)? >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html >>>>>>>>>>> [2] >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>> >>>>>>>>> >>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>> >>>>>>>> >>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>> >>>>>> >>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>> <https://www.facebook.com/remindhq> >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>