In the long run we probably have to provide a hook in the API for this, yes.
On Wed, 25 May 2016 at 15:54 Bart Wyatt <bart.wy...@dsvolition.com> wrote: > I will give this a shot this morning. > > > Considering this and the other email "Does Kafka connector leverage Kafka > message keys?" which also ends up talking about hacking around > KeyedStream's use of a HashPartitioner<>(...) is it worth looking in to > providing a KeyedStream constructor that uses a ForwardPartitioner? This > was what I was going to try this morning until you gave me a path that > doesn't involve editing flink code. > > > -Bart > > > > ------------------------------ > *From:* Aljoscha Krettek <aljos...@apache.org> > *Sent:* Wednesday, May 25, 2016 4:07 AM > *To:* user@flink.apache.org > *Subject:* Re: stream keyBy without repartition > > Hi, > what Kostas said is correct. > > You can however, hack it. You would have to manually instantiate a > WindowOperator and apply it on the non-keyed DataStream while still > providing a key-selector (and serializer) for state. This might sound > complicated but I'll try and walk you through the steps. Please let me know > if anything is unclear, still. > > ## Creating the WindowOperator > This can be copied from WindowedStream.apply(R initialValue, > FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, > TypeInformation<R> resultType): > > DataStream<> input = ... // create stream from sources > > TypeInformation<R> resultType = > TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), > Utils.getCallLocationName(), true); > > if (foldFunction instanceof RichFunction) { > throw new UnsupportedOperationException("FoldFunction of apply can not > be a RichFunction."); > } > if (windowAssigner instanceof MergingWindowAssigner) { > throw new UnsupportedOperationException("Fold cannot be used with a > merging WindowAssigner."); > } > > //clean the closures > function = input.getExecutionEnvironment().clean(function); > foldFunction = input.getExecutionEnvironment().clean(foldFunction); > > String callLocation = Utils.getCallLocationName(); > String udfName = "WindowedStream." + callLocation; > > String opName; > KeySelector<T, K> keySel = input.getKeySelector(); > > OneInputStreamOperator<T, R> operator; > > FoldingStateDescriptor<T, R> stateDesc = new > FoldingStateDescriptor<>("window-contents", > initialValue, > foldFunction, > resultType); > > opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + > trigger + ", " + udfName + ")"; > > operator = new WindowOperator<>(windowAssigner, > > windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), > keySel, > > input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), > stateDesc, > new InternalSingleValueWindowFunction<>(function), > trigger); > > SingleOutputStreamOperator<> result = return input.transform(opName, > resultType, operator); > > ## Setting the KeySelector/Serializer for the state > This can be copied from KeyedStream.transform: > > OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) > returnStream.getTransformation(); > transform.setStateKeySelector(keySelector); // this would be your > KeySelector > transform.setStateKeyType(keyType); // this would be a TypeInformation for > your key type > > now, "result" should be your pre-combined data that was not shuffled. On > this you can key by your other type and instantiate a WindowOperator in the > normal way. > > Cheers, > Aljoscha > > > On Tue, 24 May 2016 at 17:45 Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > >> Hi Bart, >> >> From what I understand, you want to do a partial (per node) aggregation >> before shipping the result >> for the final one at the end. In addition, the keys do not seem to change >> between aggregations, right? >> >> If this is the case, this is the functionality of the Combiner in batch. >> In Batch (DataSet API) this is supported, but in Streaming it is not. >> >> If your main concern is optimizing your already up-and-running job, it >> would be worth sharing your code >> (or an example with the same characteristics / communication patterns if >> the real code is not possible) >> so that we can have a look and potentially find other parts of the >> pipeline that can be optimized. >> >> For example, given that you are concerned with the serialization >> overhead, it may be worth >> seeing if there are better alternatives to use. >> >> Kostas >> >> >> On May 24, 2016, at 4:22 PM, Bart Wyatt <bart.wy...@dsvolition.com> >> wrote: >> >> (migrated from IRC) >> >> Hello All, >> >> My situation is this: >> I have a large amount of data partitioned in kafka by "session" (natural >> partitioning). After I read the data, I would like to do as much as >> possible before incurring re-serialization or network traffic due to the >> size of the data. I am on 1.0.3 in the java API. >> >> What I'd like to do is: >> >> while maintaining the natural partitioning (so that a single thread can >> perform this) read data from kafka, perform a window'd fold over the >> incoming data keyed by a _different_ field("key") then take the product of >> that window'd fold and allow re-partitioning to colocate data with >> equivalent keys in a new partitioning scheme where they can be reduced into >> a final product. The hope is that the products of such a windowed fold are >> orders of magnitude smaller than the data that would be serialized/sent if >> we re-partitioned before the window'd fold. >> >> Is there a way to .keyBy(...) such that it will act within the physical >> partitioning of the data and not force a re-partitioning of the data by >> that key? >> >> thanks >> -Bart >> >> >> ------------------------------ >> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or >> PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient >> and, therefore, may not be retransmitted to any party outside of the >> recipient's organization without the prior written consent of the sender. >> If you have received this e-mail in error please notify the sender >> immediately by telephone or reply e-mail and destroy the original message >> without making a copy. Deep Silver, Inc. accepts no liability for any >> losses or damages resulting from infected e-mail transmissions and viruses >> in e-mail attachments. >> >> >> > > ------------------------------ > This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or > PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient > and, therefore, may not be retransmitted to any party outside of the > recipient's organization without the prior written consent of the sender. > If you have received this e-mail in error please notify the sender > immediately by telephone or reply e-mail and destroy the original message > without making a copy. Deep Silver, Inc. accepts no liability for any > losses or damages resulting from infected e-mail transmissions and viruses > in e-mail attachments. >