Hey, As for the second part of your question:
If you want to apply transformations such as reduce on windows, you need to create a windowed datastream and apply your groupBy, reduce transformations on this WindowedDataStream before calling .flatten() stream.window(..).flatten() have no effect as the windowed stream is turned back to a simple DataStream by flatten. I suggest to read through the windowing section of the programming guide: http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#window-operators I also attached a picture illustrating some windowing basics: https://drive.google.com/file/d/0B5gt88bgWabkMzg2ZzBIREw1WG8/view?usp=sharing I hope this helps :) Let me know if you have further questions. Regards, Gyula William Saar <william.s...@king.com> ezt írta (időpont: 2015. jún. 5., P, 15:02): > Hi, > > How can I maintain a local state, for instance a ConcurrentHashMap, across > different steps in a streaming chain on a single machine/process? > > Static variable? (This doesn’t seem to work well when running locally as > it gets shared across multiple instances, a common “pipeline” store would > be helpful) > > > > Is it OK to checkpoint such a local state in a single map operation at the > beginning of the pipeline, or does it need to be done for every function? > > > > Will multiple groupBy steps using the same key selector output pass data > to the same machines? (To preserve data locality) > > > > How can I do a fold/reduce operation that only returns its result after a > full window has been processed, even when the processing in the window > includes streams that have been distributed and merged from different > machines using groupBy? > > > > My scenario is as follows > > I want to build up and partition a large state across different machines > by using groupBy on a stream. The processing occurs in a window and some > processing needs to be done on multiple machines so I want to do additional > groupBy operators to pass partial results to other machines. Pseudo code: > > > > flattenedWindowStream = streamSource.groupBy(myKeySelector). // Initial > paritioning > > map(localStateSaverCheckpointMapper). //Checkpoint that saves local > state, just passes through the data > > window(Count(100)).flatten(); > > > > localAndRemoteStream = flattenedWindowStream.split(event -> > canBeProcessedLocally(event) ? “local” : “remote” ); > > remoteStream = localAndRemoteStream.select(“remote”). > > map(partialProcessing). // Partially process what I can with my local > state > > groupBy(myKeySelector). // Send the partial processing to the machines > that own the rest of the data > > map(process); > > globalResult = > localAndRemoteStream.select(“local”).map(process).union(remoteStream).broadcast(); > // Broadcast all fully processed results to all machines > > > > globalResult.fold().addSink(globalWindowOutputSink) // fold/reduce, I > want a result based on the full contents of the window > > > > Any help would be greatly appreciated! > > > > Thanks, > > William > > > > >