Perfect, f in our use case, the kafka partition key and the keyBy use the same exact field and thus the order will be preserved.
On Mon, Dec 11, 2017 at 4:34 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > the order or records that are sent from one task to another task is > preserved (task refers to the parallel instance of an operator). > However, a task that receives records from multiple input tasks, consumes > records from its inputs in arbitrary order. > > If a job reads from a partitioned Kafka topic and does a keyBy on the > partitioning key of the Kafka topic, an operator task that follows the > keyBy consumes all records with the same key from exactly one input task > (the one reading the Kafka partition for the key). > However, since Flink's and Kafka's partitioning functions are not the > same, records from the same partition with different keys can be sent to > different tasks. > > So: > 1) Records from the same partition might not be processed by the same > operator (and hence not in order). > 2) Records with the same key are processed by the same operator in the > same order in which they were read from the partition. > > Best, > Fabian > > 2017-12-09 18:09 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > >> An additional question is that if the source is key partitioned ( kafka >> ) does a keyBy retain the order of a kafka partirion across a shuffle ? >> >> On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> I understand that. Let me elaborate. The sequence of events is >>> >>> 1. Round robin dispatch to kafka cluster ( it is not partitioned on the >>> key which we may ultimately do and than I will have more questions on how >>> to key y and still keep order, pbly avoid shuffle :) ) . >>> 2. key by a high cardinality key >>> 3. Sessionize >>> 4. B'coz of the RR on kafka ( and even if partitioned on the key and a >>> subsequent key by ), the sort order is not retained and the ACC has to hold >>> on to the elements in a List . When the Window is finalized we sort the in >>> ACC List and do pagination, We are looking for paths within a session >>> from . a source to a sink event based. I was hoping to use ROCKS DB state >>> as a final merged list and thus off heap and use a Count based Trigger to >>> evaluate the ACC and merge the inter Trigger collection to the master copy >>> rather than keeping all events in the ACC ( I would imagine a very general >>> pattern to use ). >>> >>> Does that make sense ? >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> If you use an AggregatingFunction in this way (i.e. for a window) the >>>> ACC should in fact be kept in the state backend. Did you configure the job >>>> to use RocksDB? How are the memory problems manifesting? >>>> >>>> Best, >>>> Aljoscha >>>> >>>> >>>> On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote: >>>> >>>> Hi Vishal, >>>> >>>> you are right, it is not possible to use state in an AggregateFunction >>>> because windows need to be mergeable. >>>> An AggregateFunction knows how to merge its accumulators but merging >>>> generic state is not possible. >>>> >>>> I am not aware of an efficient and easy work around for this. >>>> If you want to use the provided session window logic, you can use a >>>> WindowFunction that performs all computations when the window is triggered. >>>> This means that aggregations do not happen eagerly and all events for a >>>> window are collected and held in state. >>>> Another approach could be to implement the whole logic (incl. the >>>> session windowing) using a ProcessFunction. This would be a major effort >>>> though. >>>> >>>> Best, >>>> Fabian >>>> >>>> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>>> >>>>> It seems that this has to do with session windows tbat are mergeable ? >>>>> I tried the RixhWindow function and that seems to suggest that one cannot >>>>> use state ? Any ideas folks... >>>>> >>>>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com> >>>>> wrote: >>>>> >>>>>> I have a simple Aggregation with one caveat. For some reason I have >>>>>> to keep a large amount of state till the window is GCed. The state is >>>>>> within the Accumulator ( ACC ). I am hitting a memory bottleneck and >>>>>> would >>>>>> like to offload the state to the states backend ( ROCKSDB), keeping the >>>>>> between checkpoint state in memory ( seems to be an obvious fix). I am >>>>>> not >>>>>> though allowed to have a RichAggregateFunction in the aggregate method >>>>>> of a >>>>>> windowed stream . That begs 2 questions >>>>>> >>>>>> 1. Why >>>>>> 2. Is there an alternative for stateful window aggregation where we >>>>>> manage the state. ? >>>>>> >>>>>> Thanks Vishal >>>>>> >>>>>> >>>>>> Here is the code ( generics but it works ) >>>>>> >>>>>> SingleOutputStreamOperator<OUT> retVal = input >>>>>> .keyBy(keySelector) >>>>>> .window(EventTimeSessionWindows.withGap(gap)) >>>>>> .aggregate( >>>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>>> >>>>>> @Override >>>>>> public ACC createAccumulator() { >>>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>>> newInstance.resetLocal(); >>>>>> return newInstance; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void add(IN value, ACC accumulator) { >>>>>> accumulator.add(value); >>>>>> >>>>>> } >>>>>> >>>>>> @Override >>>>>> public OUT getResult(ACC accumulator) { >>>>>> return accumulator.getLocalValue(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public ACC merge(ACC a, ACC b) { >>>>>> a.merge(b); >>>>>> return a; >>>>>> } >>>>>> }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() { >>>>>> @Override >>>>>> public void apply(KEY s, TimeWindow window, >>>>>> Iterable<OUT> input, Collector<OUT> out) throws Exception { >>>>>> out.collect(input.iterator().next()); >>>>>> } >>>>>> }, accType, aggregationResultType, >>>>>> aggregationResultType); >>>>>> >>>>>> >>>> >>>> >>> >> >