Hi,
what Kostas said is correct.

You can however, hack it. You would have to manually instantiate a
WindowOperator and apply it on the non-keyed DataStream while still
providing a key-selector (and serializer) for state. This might sound
complicated but I'll try and walk you through the steps. Please let me know
if anything is unclear, still.

## Creating the WindowOperator
This can be copied from WindowedStream.apply(R initialValue,
FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function,
TypeInformation<R> resultType):

DataStream<> input = ... // create stream from sources

TypeInformation<R> resultType =
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
        Utils.getCallLocationName(), true);

if (foldFunction instanceof RichFunction) {
    throw new UnsupportedOperationException("FoldFunction of apply can not
be a RichFunction.");
}
if (windowAssigner instanceof MergingWindowAssigner) {
    throw new UnsupportedOperationException("Fold cannot be used with a
merging WindowAssigner.");
}

//clean the closures
function = input.getExecutionEnvironment().clean(function);
foldFunction = input.getExecutionEnvironment().clean(foldFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;

FoldingStateDescriptor<T, R> stateDesc = new
FoldingStateDescriptor<>("window-contents",
    initialValue,
    foldFunction,
    resultType);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " +
trigger + ", " + udfName + ")";

operator = new WindowOperator<>(windowAssigner,

windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
    keySel,

input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
    stateDesc,
    new InternalSingleValueWindowFunction<>(function),
    trigger);

SingleOutputStreamOperator<> result = return input.transform(opName,
resultType, operator);

## Setting the KeySelector/Serializer for the state
This can be copied from KeyedStream.transform:

OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>)
returnStream.getTransformation();
transform.setStateKeySelector(keySelector); // this would be your
KeySelector
transform.setStateKeyType(keyType); // this would be a TypeInformation for
your key type

now, "result" should be your pre-combined data that was not shuffled. On
this you can key by your other type and instantiate a WindowOperator in the
normal way.

Cheers,
Aljoscha


On Tue, 24 May 2016 at 17:45 Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi Bart,
>
> From what I understand, you want to do a partial (per node) aggregation
> before shipping the result
> for the final one at the end. In addition, the keys do not seem to change
> between aggregations, right?
>
> If this is the case, this is the functionality of the Combiner in batch.
> In Batch (DataSet API) this is supported, but in Streaming it is not.
>
> If your main concern is optimizing your already up-and-running job, it
> would be worth sharing your code
> (or an example with the same characteristics / communication patterns if
> the real code is not possible)
> so that we can have a look and potentially find other parts of the
> pipeline that can be optimized.
>
> For example, given that you are concerned with the serialization overhead,
> it may be worth
> seeing if there are better alternatives to use.
>
> Kostas
>
>
> On May 24, 2016, at 4:22 PM, Bart Wyatt <bart.wy...@dsvolition.com> wrote:
>
> (migrated from IRC)
>
> Hello All,
>
> My situation is this:
> I have a large amount of data partitioned in kafka by "session" (natural
> partitioning).  After I read the data, I would like to do as much as
> possible before incurring re-serialization or network traffic due to the
> size of the data.  I am on 1.0.3 in the java API.
>
> What I'd like to do is:
>
> while maintaining the natural partitioning (so that a single thread can
> perform this) read data from kafka, perform a window'd fold over the
> incoming data keyed by a _different_ field("key") then take the product of
> that window'd fold and allow re-partitioning to colocate data with
> equivalent keys in a new partitioning scheme where they can be reduced into
> a final product.  The hope is that the products of such a windowed fold are
> orders of magnitude smaller than the data that would be serialized/sent if
> we re-partitioned before the window'd fold.
>
> Is there a way to .keyBy(...) such that it will act within the physical
> partitioning of the data and not force a  re-partitioning of the data by
> that key?
>
> thanks
> -Bart
>
>
> ------------------------------
> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or
> PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient
> and, therefore, may not be retransmitted to any party outside of the
> recipient's organization without the prior written consent of the sender.
> If you have received this e-mail in error please notify the sender
> immediately by telephone or reply e-mail and destroy the original message
> without making a copy. Deep Silver, Inc. accepts no liability for any
> losses or damages resulting from infected e-mail transmissions and viruses
> in e-mail attachments.
>
>
>

Reply via email to