Hi, what Kostas said is correct. You can however, hack it. You would have to manually instantiate a WindowOperator and apply it on the non-keyed DataStream while still providing a key-selector (and serializer) for state. This might sound complicated but I'll try and walk you through the steps. Please let me know if anything is unclear, still.
## Creating the WindowOperator This can be copied from WindowedStream.apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType): DataStream<> input = ... // create stream from sources TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction of apply can not be a RichFunction."); } if (windowAssigner instanceof MergingWindowAssigner) { throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner."); } //clean the closures function = input.getExecutionEnvironment().clean(function); foldFunction = input.getExecutionEnvironment().clean(foldFunction); String callLocation = Utils.getCallLocationName(); String udfName = "WindowedStream." + callLocation; String opName; KeySelector<T, K> keySel = input.getKeySelector(); OneInputStreamOperator<T, R> operator; FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents", initialValue, foldFunction, resultType); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueWindowFunction<>(function), trigger); SingleOutputStreamOperator<> result = return input.transform(opName, resultType, operator); ## Setting the KeySelector/Serializer for the state This can be copied from KeyedStream.transform: OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation(); transform.setStateKeySelector(keySelector); // this would be your KeySelector transform.setStateKeyType(keyType); // this would be a TypeInformation for your key type now, "result" should be your pre-combined data that was not shuffled. On this you can key by your other type and instantiate a WindowOperator in the normal way. Cheers, Aljoscha On Tue, 24 May 2016 at 17:45 Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi Bart, > > From what I understand, you want to do a partial (per node) aggregation > before shipping the result > for the final one at the end. In addition, the keys do not seem to change > between aggregations, right? > > If this is the case, this is the functionality of the Combiner in batch. > In Batch (DataSet API) this is supported, but in Streaming it is not. > > If your main concern is optimizing your already up-and-running job, it > would be worth sharing your code > (or an example with the same characteristics / communication patterns if > the real code is not possible) > so that we can have a look and potentially find other parts of the > pipeline that can be optimized. > > For example, given that you are concerned with the serialization overhead, > it may be worth > seeing if there are better alternatives to use. > > Kostas > > > On May 24, 2016, at 4:22 PM, Bart Wyatt <bart.wy...@dsvolition.com> wrote: > > (migrated from IRC) > > Hello All, > > My situation is this: > I have a large amount of data partitioned in kafka by "session" (natural > partitioning). After I read the data, I would like to do as much as > possible before incurring re-serialization or network traffic due to the > size of the data. I am on 1.0.3 in the java API. > > What I'd like to do is: > > while maintaining the natural partitioning (so that a single thread can > perform this) read data from kafka, perform a window'd fold over the > incoming data keyed by a _different_ field("key") then take the product of > that window'd fold and allow re-partitioning to colocate data with > equivalent keys in a new partitioning scheme where they can be reduced into > a final product. The hope is that the products of such a windowed fold are > orders of magnitude smaller than the data that would be serialized/sent if > we re-partitioned before the window'd fold. > > Is there a way to .keyBy(...) such that it will act within the physical > partitioning of the data and not force a re-partitioning of the data by > that key? > > thanks > -Bart > > > ------------------------------ > This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or > PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient > and, therefore, may not be retransmitted to any party outside of the > recipient's organization without the prior written consent of the sender. > If you have received this e-mail in error please notify the sender > immediately by telephone or reply e-mail and destroy the original message > without making a copy. Deep Silver, Inc. accepts no liability for any > losses or damages resulting from infected e-mail transmissions and viruses > in e-mail attachments. > > >