Perfect, f in our use case, the kafka partition  key and the keyBy use the
same exact field and thus the order will be preserved.

On Mon, Dec 11, 2017 at 4:34 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> the order or records that are sent from one task to another task is
> preserved (task refers to the parallel instance of an operator).
> However, a task that receives records from multiple input tasks, consumes
> records from its inputs in arbitrary order.
>
> If a job reads from a partitioned Kafka topic and does a keyBy on the
> partitioning key of the Kafka topic, an operator task that follows the
> keyBy consumes all records with the same key from exactly one input task
> (the one reading the Kafka partition for the key).
> However, since Flink's and Kafka's partitioning functions are not the
> same, records from the same partition with different keys can be sent to
> different tasks.
>
> So:
> 1) Records from the same partition might not be processed by the same
> operator (and hence not in order).
> 2) Records with the same key are processed by the same operator in the
> same order in which they were read from the partition.
>
> Best,
> Fabian
>
> 2017-12-09 18:09 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>
>> An additional question is that if the source is key partitioned  ( kafka
>> ) does a keyBy retain the order of  a kafka partirion across a shuffle ?
>>
>> On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I understand that. Let me elaborate. The sequence of events is
>>>
>>> 1. Round robin dispatch to kafka cluster  ( it is not partitioned on the
>>> key which we may ultimately do  and than I will have more questions on how
>>> to key y and still keep order, pbly avoid shuffle :) ) .
>>> 2. key by a high cardinality key
>>> 3. Sessionize
>>> 4. B'coz of the RR on kafka ( and even if partitioned on the key and a
>>> subsequent key by ), the sort order is not retained and the ACC has to hold
>>> on to the elements in a List . When the Window is finalized  we sort the in
>>> ACC  List  and do pagination, We are looking for paths within a session
>>> from . a source to a sink event based. I was hoping to use ROCKS DB state
>>> as a final merged list and thus off heap and  use a Count based Trigger to
>>> evaluate the ACC and merge the inter Trigger collection to the  master copy
>>> rather than keeping all events in the ACC ( I would imagine a very general
>>> pattern to use ).
>>>
>>> Does that make sense ?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> If you use an AggregatingFunction in this way (i.e. for a window) the
>>>> ACC should in fact be kept in the state backend. Did you configure the job
>>>> to use RocksDB? How are the memory problems manifesting?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>
>>>> Hi Vishal,
>>>>
>>>> you are right, it is not possible to use state in an AggregateFunction
>>>> because windows need to be mergeable.
>>>> An AggregateFunction knows how to merge its accumulators but merging
>>>> generic state is not possible.
>>>>
>>>> I am not aware of an efficient and easy work around for this.
>>>> If you want to use the provided session window logic, you can use a
>>>> WindowFunction that performs all computations when the window is triggered.
>>>> This means that aggregations do not happen eagerly and all events for a
>>>> window are collected and held in state.
>>>> Another approach could be to implement the whole logic (incl. the
>>>> session windowing) using a ProcessFunction. This would be a major effort
>>>> though.
>>>>
>>>> Best,
>>>> Fabian
>>>>
>>>> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>>
>>>>> It seems that this has to do with session windows tbat are mergeable ?
>>>>> I tried the RixhWindow function and that seems to suggest that one cannot
>>>>> use state ? Any ideas folks...
>>>>>
>>>>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have a simple Aggregation with one caveat. For some reason I have
>>>>>> to keep a large amount of state till the window is GCed. The state is
>>>>>> within the Accumulator ( ACC ). I am hitting a memory bottleneck and 
>>>>>> would
>>>>>> like to offload the state  to the states backend ( ROCKSDB), keeping the
>>>>>> between checkpoint state in memory ( seems to be an obvious fix). I am 
>>>>>> not
>>>>>> though allowed to have a RichAggregateFunction in the aggregate method 
>>>>>> of a
>>>>>> windowed stream . That begs 2 questions
>>>>>>
>>>>>> 1. Why
>>>>>> 2. Is there an alternative for stateful window aggregation where we
>>>>>> manage the state. ?
>>>>>>
>>>>>> Thanks Vishal
>>>>>>
>>>>>>
>>>>>> Here is the code ( generics but it works  )
>>>>>>
>>>>>> SingleOutputStreamOperator<OUT> retVal = input
>>>>>>         .keyBy(keySelector)
>>>>>>         .window(EventTimeSessionWindows.withGap(gap))
>>>>>>         .aggregate(
>>>>>>                 new AggregateFunction<IN, ACC, OUT>() {
>>>>>>
>>>>>>                     @Override
>>>>>>                     public ACC createAccumulator() {
>>>>>>                         ACC newInstance = (ACC) accumulator.clone();
>>>>>>                         newInstance.resetLocal();
>>>>>>                         return newInstance;
>>>>>>                     }
>>>>>>
>>>>>>                     @Override
>>>>>>                     public void add(IN value, ACC accumulator) {
>>>>>>                         accumulator.add(value);
>>>>>>
>>>>>>                     }
>>>>>>
>>>>>>                     @Override
>>>>>>                     public OUT getResult(ACC accumulator) {
>>>>>>                         return accumulator.getLocalValue();
>>>>>>                     }
>>>>>>
>>>>>>                     @Override
>>>>>>                     public ACC merge(ACC a, ACC b) {
>>>>>>                         a.merge(b);
>>>>>>                         return a;
>>>>>>                     }
>>>>>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>>>>>                     @Override
>>>>>>                     public void apply(KEY s, TimeWindow window, 
>>>>>> Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>>>>>                             out.collect(input.iterator().next());
>>>>>>                     }
>>>>>>                 }, accType, aggregationResultType, 
>>>>>> aggregationResultType);
>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to