In the long run we probably have to provide a hook in the API for this, yes.

On Wed, 25 May 2016 at 15:54 Bart Wyatt <bart.wy...@dsvolition.com> wrote:

> ​I will give this a shot this morning.
>
>
> Considering this and the other email "Does Kafka connector leverage Kafka
> message keys?" which also ends up talking about hacking around
> KeyedStream's use of a HashPartitioner<>(...) is it worth looking in to
> providing a KeyedStream constructor that uses a ForwardPartitioner?  This
> was what I was going to try this morning until  you gave me a path that
> doesn't involve editing flink code.
>
>
> -Bart
>
>
>
> ------------------------------
> *From:* Aljoscha Krettek <aljos...@apache.org>
> *Sent:* Wednesday, May 25, 2016 4:07 AM
> *To:* user@flink.apache.org
> *Subject:* Re: stream keyBy without repartition
>
> Hi,
> what Kostas said is correct.
>
> You can however, hack it. You would have to manually instantiate a
> WindowOperator and apply it on the non-keyed DataStream while still
> providing a key-selector (and serializer) for state. This might sound
> complicated but I'll try and walk you through the steps. Please let me know
> if anything is unclear, still.
>
> ## Creating the WindowOperator
> This can be copied from WindowedStream.apply(R initialValue,
> FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function,
> TypeInformation<R> resultType):
>
> DataStream<> input = ... // create stream from sources
>
> TypeInformation<R> resultType =
> TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
>         Utils.getCallLocationName(), true);
>
> if (foldFunction instanceof RichFunction) {
>     throw new UnsupportedOperationException("FoldFunction of apply can not
> be a RichFunction.");
> }
> if (windowAssigner instanceof MergingWindowAssigner) {
>     throw new UnsupportedOperationException("Fold cannot be used with a
> merging WindowAssigner.");
> }
>
> //clean the closures
> function = input.getExecutionEnvironment().clean(function);
> foldFunction = input.getExecutionEnvironment().clean(foldFunction);
>
> String callLocation = Utils.getCallLocationName();
> String udfName = "WindowedStream." + callLocation;
>
> String opName;
> KeySelector<T, K> keySel = input.getKeySelector();
>
> OneInputStreamOperator<T, R> operator;
>
> FoldingStateDescriptor<T, R> stateDesc = new
> FoldingStateDescriptor<>("window-contents",
>     initialValue,
>     foldFunction,
>     resultType);
>
> opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " +
> trigger + ", " + udfName + ")";
>
> operator = new WindowOperator<>(windowAssigner,
>
> windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
>     keySel,
>
> input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
>     stateDesc,
>     new InternalSingleValueWindowFunction<>(function),
>     trigger);
>
> SingleOutputStreamOperator<> result = return input.transform(opName,
> resultType, operator);
>
> ## Setting the KeySelector/Serializer for the state
> This can be copied from KeyedStream.transform:
>
> OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>)
> returnStream.getTransformation();
> transform.setStateKeySelector(keySelector); // this would be your
> KeySelector
> transform.setStateKeyType(keyType); // this would be a TypeInformation for
> your key type
>
> now, "result" should be your pre-combined data that was not shuffled. On
> this you can key by your other type and instantiate a WindowOperator in the
> normal way.
>
> Cheers,
> Aljoscha
>
>
> On Tue, 24 May 2016 at 17:45 Kostas Kloudas <k.klou...@data-artisans.com>
> wrote:
>
>> Hi Bart,
>>
>> From what I understand, you want to do a partial (per node) aggregation
>> before shipping the result
>> for the final one at the end. In addition, the keys do not seem to change
>> between aggregations, right?
>>
>> If this is the case, this is the functionality of the Combiner in batch.
>> In Batch (DataSet API) this is supported, but in Streaming it is not.
>>
>> If your main concern is optimizing your already up-and-running job, it
>> would be worth sharing your code
>> (or an example with the same characteristics / communication patterns if
>> the real code is not possible)
>> so that we can have a look and potentially find other parts of the
>> pipeline that can be optimized.
>>
>> For example, given that you are concerned with the serialization
>> overhead, it may be worth
>> seeing if there are better alternatives to use.
>>
>> Kostas
>>
>>
>> On May 24, 2016, at 4:22 PM, Bart Wyatt <bart.wy...@dsvolition.com>
>> wrote:
>>
>> (migrated from IRC)
>>
>> Hello All,
>>
>> My situation is this:
>> I have a large amount of data partitioned in kafka by "session" (natural
>> partitioning).  After I read the data, I would like to do as much as
>> possible before incurring re-serialization or network traffic due to the
>> size of the data.  I am on 1.0.3 in the java API.
>>
>> What I'd like to do is:
>>
>> while maintaining the natural partitioning (so that a single thread can
>> perform this) read data from kafka, perform a window'd fold over the
>> incoming data keyed by a _different_ field("key") then take the product of
>> that window'd fold and allow re-partitioning to colocate data with
>> equivalent keys in a new partitioning scheme where they can be reduced into
>> a final product.  The hope is that the products of such a windowed fold are
>> orders of magnitude smaller than the data that would be serialized/sent if
>> we re-partitioned before the window'd fold.
>>
>> Is there a way to .keyBy(...) such that it will act within the physical
>> partitioning of the data and not force a  re-partitioning of the data by
>> that key?
>>
>> thanks
>> -Bart
>>
>>
>> ------------------------------
>> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or
>> PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient
>> and, therefore, may not be retransmitted to any party outside of the
>> recipient's organization without the prior written consent of the sender.
>> If you have received this e-mail in error please notify the sender
>> immediately by telephone or reply e-mail and destroy the original message
>> without making a copy. Deep Silver, Inc. accepts no liability for any
>> losses or damages resulting from infected e-mail transmissions and viruses
>> in e-mail attachments.
>>
>>
>>
>
> ------------------------------
> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or
> PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient
> and, therefore, may not be retransmitted to any party outside of the
> recipient's organization without the prior written consent of the sender.
> If you have received this e-mail in error please notify the sender
> immediately by telephone or reply e-mail and destroy the original message
> without making a copy. Deep Silver, Inc. accepts no liability for any
> losses or damages resulting from infected e-mail transmissions and viruses
> in e-mail attachments.
>

Reply via email to