Hi, the order or records that are sent from one task to another task is preserved (task refers to the parallel instance of an operator). However, a task that receives records from multiple input tasks, consumes records from its inputs in arbitrary order.
If a job reads from a partitioned Kafka topic and does a keyBy on the partitioning key of the Kafka topic, an operator task that follows the keyBy consumes all records with the same key from exactly one input task (the one reading the Kafka partition for the key). However, since Flink's and Kafka's partitioning functions are not the same, records from the same partition with different keys can be sent to different tasks. So: 1) Records from the same partition might not be processed by the same operator (and hence not in order). 2) Records with the same key are processed by the same operator in the same order in which they were read from the partition. Best, Fabian 2017-12-09 18:09 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > An additional question is that if the source is key partitioned ( kafka ) > does a keyBy retain the order of a kafka partirion across a shuffle ? > > On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <vishal.santo...@gmail.com > > wrote: > >> I understand that. Let me elaborate. The sequence of events is >> >> 1. Round robin dispatch to kafka cluster ( it is not partitioned on the >> key which we may ultimately do and than I will have more questions on how >> to key y and still keep order, pbly avoid shuffle :) ) . >> 2. key by a high cardinality key >> 3. Sessionize >> 4. B'coz of the RR on kafka ( and even if partitioned on the key and a >> subsequent key by ), the sort order is not retained and the ACC has to hold >> on to the elements in a List . When the Window is finalized we sort the in >> ACC List and do pagination, We are looking for paths within a session >> from . a source to a sink event based. I was hoping to use ROCKS DB state >> as a final merged list and thus off heap and use a Count based Trigger to >> evaluate the ACC and merge the inter Trigger collection to the master copy >> rather than keeping all events in the ACC ( I would imagine a very general >> pattern to use ). >> >> Does that make sense ? >> >> >> >> >> >> >> >> >> >> >> On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> >>> If you use an AggregatingFunction in this way (i.e. for a window) the >>> ACC should in fact be kept in the state backend. Did you configure the job >>> to use RocksDB? How are the memory problems manifesting? >>> >>> Best, >>> Aljoscha >>> >>> >>> On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>> Hi Vishal, >>> >>> you are right, it is not possible to use state in an AggregateFunction >>> because windows need to be mergeable. >>> An AggregateFunction knows how to merge its accumulators but merging >>> generic state is not possible. >>> >>> I am not aware of an efficient and easy work around for this. >>> If you want to use the provided session window logic, you can use a >>> WindowFunction that performs all computations when the window is triggered. >>> This means that aggregations do not happen eagerly and all events for a >>> window are collected and held in state. >>> Another approach could be to implement the whole logic (incl. the >>> session windowing) using a ProcessFunction. This would be a major effort >>> though. >>> >>> Best, >>> Fabian >>> >>> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>> >>>> It seems that this has to do with session windows tbat are mergeable ? >>>> I tried the RixhWindow function and that seems to suggest that one cannot >>>> use state ? Any ideas folks... >>>> >>>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com> >>>> wrote: >>>> >>>>> I have a simple Aggregation with one caveat. For some reason I have to >>>>> keep a large amount of state till the window is GCed. The state is within >>>>> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like >>>>> to >>>>> offload the state to the states backend ( ROCKSDB), keeping the between >>>>> checkpoint state in memory ( seems to be an obvious fix). I am not though >>>>> allowed to have a RichAggregateFunction in the aggregate method of a >>>>> windowed stream . That begs 2 questions >>>>> >>>>> 1. Why >>>>> 2. Is there an alternative for stateful window aggregation where we >>>>> manage the state. ? >>>>> >>>>> Thanks Vishal >>>>> >>>>> >>>>> Here is the code ( generics but it works ) >>>>> >>>>> SingleOutputStreamOperator<OUT> retVal = input >>>>> .keyBy(keySelector) >>>>> .window(EventTimeSessionWindows.withGap(gap)) >>>>> .aggregate( >>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>> >>>>> @Override >>>>> public ACC createAccumulator() { >>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>> newInstance.resetLocal(); >>>>> return newInstance; >>>>> } >>>>> >>>>> @Override >>>>> public void add(IN value, ACC accumulator) { >>>>> accumulator.add(value); >>>>> >>>>> } >>>>> >>>>> @Override >>>>> public OUT getResult(ACC accumulator) { >>>>> return accumulator.getLocalValue(); >>>>> } >>>>> >>>>> @Override >>>>> public ACC merge(ACC a, ACC b) { >>>>> a.merge(b); >>>>> return a; >>>>> } >>>>> }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() { >>>>> @Override >>>>> public void apply(KEY s, TimeWindow window, >>>>> Iterable<OUT> input, Collector<OUT> out) throws Exception { >>>>> out.collect(input.iterator().next()); >>>>> } >>>>> }, accType, aggregationResultType, aggregationResultType); >>>>> >>>>> >>> >>> >> >