Hi! What you can do is have a static "broker", which is essentially a map from subtaskIndex to your concurrent hash map. Then all tasks in a pipeline (who have the same subtaskIndex) will grab the same ConcurrentHashMap.
You can grab the subtask index if you have a RichFunction (such as RichMapFunction) and call getRuntimeContext().getIndexOfThisSubtask(). You can use the "org.apache.flink.runtime.iterative.concurrent.Broker" (part of "flink-runtime") for that. We use it to hand over shared control structures in iterations. The key would by the subtaskIndex. Bear in mind that, in order for this to work well with the checkpointing, you will need to have the ConcurrentHashMap "owned" consistently by one of the tasks, while the other tasks retrieve references. Greetings, Stephan On Fri, Jun 5, 2015 at 3:00 PM, William Saar <william.s...@king.com> wrote: > Hi, > > How can I maintain a local state, for instance a ConcurrentHashMap, across > different steps in a streaming chain on a single machine/process? > > Static variable? (This doesn’t seem to work well when running locally as > it gets shared across multiple instances, a common “pipeline” store would > be helpful) > > > > Is it OK to checkpoint such a local state in a single map operation at the > beginning of the pipeline, or does it need to be done for every function? > > > > Will multiple groupBy steps using the same key selector output pass data > to the same machines? (To preserve data locality) > > > > How can I do a fold/reduce operation that only returns its result after a > full window has been processed, even when the processing in the window > includes streams that have been distributed and merged from different > machines using groupBy? > > > > My scenario is as follows > > I want to build up and partition a large state across different machines > by using groupBy on a stream. The processing occurs in a window and some > processing needs to be done on multiple machines so I want to do additional > groupBy operators to pass partial results to other machines. Pseudo code: > > > > flattenedWindowStream = streamSource.groupBy(myKeySelector). // Initial > paritioning > > map(localStateSaverCheckpointMapper). //Checkpoint that saves local > state, just passes through the data > > window(Count(100)).flatten(); > > > > localAndRemoteStream = flattenedWindowStream.split(event -> > canBeProcessedLocally(event) ? “local” : “remote” ); > > remoteStream = localAndRemoteStream.select(“remote”). > > map(partialProcessing). // Partially process what I can with my local > state > > groupBy(myKeySelector). // Send the partial processing to the machines > that own the rest of the data > > map(process); > > globalResult = > localAndRemoteStream.select(“local”).map(process).union(remoteStream).broadcast(); > // Broadcast all fully processed results to all machines > > > > globalResult.fold().addSink(globalWindowOutputSink) // fold/reduce, I > want a result based on the full contents of the window > > > > Any help would be greatly appreciated! > > > > Thanks, > > William > > > > >