Great, thank you for the confirmation! On Thu, Jan 28, 2021 at 11:25 PM Arvid Heise <ar...@apache.org> wrote:
> Hi Rex, > > there cannot be any late event in processing time by definition (maybe on > a quantum computer?), so you should be fine. The timestamp of records in > processing time is monotonously increasing. > > Best, > > Arvid > > On Fri, Jan 29, 2021 at 1:14 AM Rex Fenley <r...@remind101.com> wrote: > >> Switching to TumblingProcessingTimeWindows seems to have solved that >> problem. >> >> For my own understanding, this won't have any "late" and therefore >> dropped records right? We cannot blindly drop a record from the aggregate >> evaluation, it just needs to take all the records it gets in a window and >> process them and then the aggregate will take whatever is last in-order. >> >> Thanks! >> >> On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley <r...@remind101.com> wrote: >> >>> It looks like it wants me to call assignTimestampsAndWatermarks but I >>> already have a timer on my window which I'd expect everything entering this >>> stream would simply be aggregated during that window >>> .window(TumblingEventTimeWindows.of(Time.seconds(1))) >>> >>> On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> I think I may have been affected by some late night programming. >>>> >>>> Slightly revised how I'm using my aggregate >>>> val userDocsStream = >>>> this.tableEnv >>>> .toRetractStream(userDocsTable, classOf[Row]) >>>> .keyBy(_.f1.getField(0)) >>>> val compactedUserDocsStream = userDocsStream >>>> .window(TumblingEventTimeWindows.of(Time.seconds(1))) >>>> .aggregate(new CompactionAggregate()) >>>> but this now gives me the following exception: >>>> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no >>>> timestamp marker). Is the time characteristic set to 'ProcessingTime', >>>> or did you forget to call >>>> 'DataStream.assignTimestampsAndWatermarks(...)'? >>>> at org.apache.flink.streaming.api.windowing.assigners. >>>> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69 >>>> ) >>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>> WindowOperator.processElement(WindowOperator.java:295) >>>> at org.apache.flink.streaming.runtime.tasks. >>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord( >>>> OneInputStreamTask.java:161) >>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>> .processElement(StreamTaskNetworkInput.java:178) >>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>> .emitNext(StreamTaskNetworkInput.java:153) >>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>> .processInput(StreamOneInputProcessor.java:67) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .processInput(StreamTask.java:351) >>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>> MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) >>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .runMailboxLoop(StreamTask.java:566) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>> StreamTask.java:536) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>>> at java.base/java.lang.Thread.run(Thread.java:829) >>>> >>>> Which I'm not at all sure how to interpret >>>> >>>> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> Ok, that sounds like it confirms my expectations. >>>>> >>>>> So I tried running my above code and had to slightly edit to using >>>>> java Tuple2 because our execution environment stuff is all in Java. >>>>> >>>>> class CompactionAggregate >>>>> extends AggregateFunction[ >>>>> Tuple2[java.lang.Boolean, Row], >>>>> Tuple2[java.lang.Boolean, Row], >>>>> Tuple2[java.lang.Boolean, Row] >>>>> ] { >>>>> >>>>> override def createAccumulator() = new Tuple2(false, null) >>>>> >>>>> // Just take the lastest value to compact. >>>>> override def add( >>>>> value: Tuple2[java.lang.Boolean, Row], >>>>> accumulator: Tuple2[java.lang.Boolean, Row] >>>>> ) = >>>>> value >>>>> >>>>> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) = >>>>> accumulator >>>>> >>>>> // This is a required function that we don't use. >>>>> override def merge( >>>>> a: Tuple2[java.lang.Boolean, Row], >>>>> b: Tuple2[java.lang.Boolean, Row] >>>>> ) = >>>>> throw new NotImplementedException() >>>>> } >>>>> >>>>> But when running I get the following error: >>>>> >Caused by: java.lang.RuntimeException: Could not extract key from >>>>> [redacted row] >>>>> >... >>>>> > Caused by: org.apache.flink.table.api.ValidationException: >>>>> Unsupported kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' >>>>> kind are supported when converting to an expression. >>>>> >>>>> I'm googling around and haven't found anything informative about what >>>>> might be causing this issue. Any ideas? >>>>> >>>>> I'll also take a look at the SQL functions you suggested and see if I >>>>> can use those. >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> >>>>> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <ar...@apache.org> wrote: >>>>> >>>>>> Hi Rex, >>>>>> >>>>>> if your keyby (and with join/grouping/windowing) is random or not >>>>>> depends on the relationship of the join/grouping key with your Kafka >>>>>> partitioning key. >>>>>> >>>>>> Say your partitioning key is document_id. Then, any join/grouping key >>>>>> that is composed of (or is exactly) document_id, will retain the order. >>>>>> You >>>>>> should always ask yourself the question: can two records coming from the >>>>>> ordered Kafka partition X be processed by two different operator >>>>>> instances. >>>>>> For a join/grouping operator, there is only the strict guarantee that all >>>>>> records with the same key will be shuffled into the same operator >>>>>> instance. >>>>>> >>>>>> Your compaction in general looks good but I'm not deep into Table >>>>>> API. I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in >>>>>> Table API should already do what you want. [1] >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions >>>>>> >>>>>> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <r...@remind101.com> wrote: >>>>>> >>>>>>> In addition to those questions, assuming that keyed streams are in >>>>>>> order, I've come up with the following solution to compact our records >>>>>>> and >>>>>>> only pick the most recent one per id before sending to the ES sink. >>>>>>> >>>>>>> The first item in the Row is the document ID / primary key which we >>>>>>> want to compact records on. >>>>>>> >>>>>>> val userDocsStream = >>>>>>> userDocsTable.toRetractStream[Row].keyBy(_._2.get(0)) >>>>>>> userDocsStream >>>>>>> .window(TumblingEventTimeWindows.of(Time.seconds(1))) >>>>>>> .aggregate(new CompactionAggregate())class CompactionAggregate >>>>>>> extends AggregateFunction[ >>>>>>> (Boolean, Row), >>>>>>> (Boolean, Row), >>>>>>> (Boolean, Row) >>>>>>> ] { override def createAccumulator() = (false, null) // Just take >>>>>>> the latest value to compact. >>>>>>> override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) = >>>>>>> value override def getResult(accumulator: (Boolean, Row)) = >>>>>>> accumulator // This is a required function that we don't use. >>>>>>> override def merge(a: (Boolean, Row), b: (Boolean, Row)) = >>>>>>> throw new NotImplementedException() >>>>>>> } >>>>>>> >>>>>>> I'm hoping that if the last record in the window is an insert it >>>>>>> picks that if it's a retract then it picks that and then when we send >>>>>>> this >>>>>>> to the ES sink we will simply check true or false in the first element >>>>>>> of >>>>>>> the tuple for an insert or delete request to ES. Does this seem like it >>>>>>> will work? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> >>>>>>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <r...@remind101.com> >>>>>>> wrote: >>>>>>> >>>>>>>> This is great info, thanks! >>>>>>>> >>>>>>>> My question then becomes, what constitutes a random shuffle? >>>>>>>> Currently we're using the Table API with minibatch on flink v1.11.3. >>>>>>>> Do our >>>>>>>> joins output a keyed stream of records by join key or is this random? I >>>>>>>> imagine that they'd have to have a key for retracts and accumulates to >>>>>>>> arrive in order on the next downstream operator. Same with aggs but on >>>>>>>> the >>>>>>>> groupBy key. >>>>>>>> >>>>>>>> Does this sound correct to you? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <ar...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Rex, >>>>>>>>> >>>>>>>>> indeed these two statements look like they contradict each other, >>>>>>>>> but they are looking at both sides from the same coin. >>>>>>>>> Flink is simply putting records in FIFO in windows. That is, there >>>>>>>>> is no ordering on event time if there are late events. So if your >>>>>>>>> elements >>>>>>>>> arrive ordered, the ordering is retained. If your elements arrive >>>>>>>>> unordered, the same unordered order is retained. >>>>>>>>> >>>>>>>>> However, note that Flink can only guarantee FIFO according to your >>>>>>>>> topology. Consider a source with parallelism 2, each reading data >>>>>>>>> from an >>>>>>>>> ordered kafka partition (k1, k2) respectively. Each partition has >>>>>>>>> records >>>>>>>>> with keys, such that no key appears in both partitions (default >>>>>>>>> behavior if >>>>>>>>> you set keys but no partition while writing to Kafka). >>>>>>>>> 1) Let's assume you do a simple transformation and write them back >>>>>>>>> into kafka with the same key. Then you can be sure that the order of >>>>>>>>> the >>>>>>>>> records is retained. >>>>>>>>> >>>>>>>>> 2) Now you add a random shuffle and have the transformation. Now >>>>>>>>> two successive records may be processed in parallel and there is a >>>>>>>>> race >>>>>>>>> condition who is written first into Kafka. So order is not retained. >>>>>>>>> >>>>>>>>> 3) You shuffle both partitions by the Kafka key (keyby) and do >>>>>>>>> some aggregation. Two successive records with the same key will >>>>>>>>> always be >>>>>>>>> processed by the same aggregation operator. So the order is retained >>>>>>>>> for >>>>>>>>> each key (note that this is what you usually want and want Kafka >>>>>>>>> gives you >>>>>>>>> if you don't set the partition explicitly and just provide a key) >>>>>>>>> >>>>>>>>> 4) You shuffle both partitions by a different key. Then two >>>>>>>>> successive Kafka records could be again calculated in parallel such >>>>>>>>> that >>>>>>>>> there is a race condition. >>>>>>>>> >>>>>>>>> Note that windows are a kind of aggregation. >>>>>>>>> >>>>>>>>> So Flink is never going to restore an ordering that is not there >>>>>>>>> (because it's too costly and there are too many unknowns). But you can >>>>>>>>> infer the guarantees by analyzing your topology. >>>>>>>>> >>>>>>>>> --- >>>>>>>>> >>>>>>>>> Please note that there is a common pitfall when you work with >>>>>>>>> Kafka: >>>>>>>>> - Ordering of records in Kafka is only guaranteed if you set >>>>>>>>> *max.in.flight.requests.per.connection >>>>>>>>> *to 1 >>>>>>>>> *. [1]* >>>>>>>>> *- *Often you also want to set *enable.idempotence* and *acks=all* >>>>>>>>> >>>>>>>>> That is true for the upstream application and if you plan back to >>>>>>>>> write to Kafka you also need to set that in Flink. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html >>>>>>>>> >>>>>>>>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> >>>>>>>>>> I began reading >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows >>>>>>>>>> >>>>>>>>>> - >>>>>>>>>> >>>>>>>>>> *> Redistributing* streams (between *map()* and *keyBy/window*, >>>>>>>>>> as well as between *keyBy/window* and *sink*) change the >>>>>>>>>> partitioning of streams. Each *operator subtask* sends data >>>>>>>>>> to different target subtasks, depending on the selected >>>>>>>>>> transformation. >>>>>>>>>> Examples are *keyBy()* (re-partitions by hash code), >>>>>>>>>> *broadcast()*, or *rebalance()* (random redistribution). In a >>>>>>>>>> *redistributing* exchange, order among elements is only >>>>>>>>>> preserved for each pair of sending- and receiving task (for >>>>>>>>>> example >>>>>>>>>> subtask[1] of *map()* and subtask[2] of *keyBy/window*). >>>>>>>>>> >>>>>>>>>> This makes it sounds like ordering on the same partition/key is >>>>>>>>>> always maintained. Which is exactly the ordering guarantee that I >>>>>>>>>> need. >>>>>>>>>> This seems to slightly contradict the statement "Flink provides no >>>>>>>>>> guarantees about the order of the elements within a window" for keyed >>>>>>>>>> state. So is it true that ordering _is_ guaranteed for identical >>>>>>>>>> keys? >>>>>>>>>> >>>>>>>>>> If I'm not mistaken, the state in the TableAPI is always >>>>>>>>>> considered keyed state for a join or aggregate. Or am I missing >>>>>>>>>> something? >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Our data arrives in order from Kafka, so we are hoping to use >>>>>>>>>>> that same order for our processing. >>>>>>>>>>> >>>>>>>>>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Going further, if "Flink provides no guarantees about the order >>>>>>>>>>>> of the elements within a window" then with minibatch, which I >>>>>>>>>>>> assume uses a >>>>>>>>>>>> window under the hood, any aggregates that expect rows to arrive >>>>>>>>>>>> in order >>>>>>>>>>>> will fail to keep their consistency. Is this correct? >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello, >>>>>>>>>>>>> >>>>>>>>>>>>> We have a job from CDC to a large unbounded Flink plan to >>>>>>>>>>>>> Elasticsearch. >>>>>>>>>>>>> >>>>>>>>>>>>> Currently, we have been relentlessly trying to reduce our >>>>>>>>>>>>> record amplification which, when our Elasticsearch index is near >>>>>>>>>>>>> fully >>>>>>>>>>>>> populated, completely bottlenecks our write performance. We >>>>>>>>>>>>> decided >>>>>>>>>>>>> recently to try a new job using mini-batch. At first this seemed >>>>>>>>>>>>> promising >>>>>>>>>>>>> but at some point we began getting huge record amplification in a >>>>>>>>>>>>> join >>>>>>>>>>>>> operator. It appears that minibatch may only batch on aggregate >>>>>>>>>>>>> operators? >>>>>>>>>>>>> >>>>>>>>>>>>> So we're now thinking that we should have a window before our >>>>>>>>>>>>> ES sink which only takes the last record for any unique document >>>>>>>>>>>>> id in the >>>>>>>>>>>>> window, since that's all we really want to send anyway. However, >>>>>>>>>>>>> when >>>>>>>>>>>>> investigating turning a table, to a keyed window stream for >>>>>>>>>>>>> deduping, and >>>>>>>>>>>>> then back into a table I read the following: >>>>>>>>>>>>> >>>>>>>>>>>>> >Attention Flink provides no guarantees about the order of >>>>>>>>>>>>> the elements within a window. This implies that although an >>>>>>>>>>>>> evictor may >>>>>>>>>>>>> remove elements from the beginning of the window, these are not >>>>>>>>>>>>> necessarily >>>>>>>>>>>>> the ones that arrive first or last. [1] >>>>>>>>>>>>> >>>>>>>>>>>>> which has put a damper on our investigation. >>>>>>>>>>>>> >>>>>>>>>>>>> I then found the deduplication SQL doc [2], but I have a hard >>>>>>>>>>>>> time parsing what the SQL does and we've never used >>>>>>>>>>>>> TemporaryViews or >>>>>>>>>>>>> proctime before. >>>>>>>>>>>>> Is this essentially what we want? >>>>>>>>>>>>> Will just using this SQL be safe for a job that is unbounded >>>>>>>>>>>>> and just wants to deduplicate a document write to whatever the >>>>>>>>>>>>> most current >>>>>>>>>>>>> one is (i.e. will restoring from a checkpoint maintain our >>>>>>>>>>>>> unbounded >>>>>>>>>>>>> consistency and will deletes work)? >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html >>>>>>>>>>>>> [2] >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks! >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> >>>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>> >>>>>>>> >>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>> >>>>>>> >>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>> <https://www.facebook.com/remindhq> >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>