If you are mutating accumulators, perhaps you might blind write the inputs and have the system manage the combining. I'd have to see the body of @ProcessElement to say more.
Kenn On Mon, Apr 16, 2018 at 3:33 PM Kenneth Knowles <[email protected]> wrote: > There's actually been a rename since that notion of CombiningState. It > used to be BagState (just blind writes) and CombiningValueState (uses a > CombineFn) were both instances of CombiningState (any automatically > mergeable thing). > > Now the names are BagState (blind writes) and CombiningState (uses a > CombineFn) which are instances of GroupingState (automatically mergeable - > you might wonder why we didn't call it MergeableState...) > > Kenn > > On Mon, Apr 16, 2018 at 12:14 PM Reuven Lax <[email protected]> wrote: > >> Out of curiosity, what are you using CombiningState for? I believe it is >> intended for use in merging windows (such as sessions), however those >> windows are not yet supported with state. >> >> Reuven >> >> On Fri, Apr 13, 2018 at 2:42 AM Ankur Chauhan <[email protected]> >> wrote: >> >>> Hi all, >>> >>> I recently updated my dataflow pipeline to 2.4.0 sdk and found that my >>> stateful DoFn with the following statespec is throwing >>> java.lang.UnsupportedOperationException. >>> >>> For reference the job information is: >>> >>> - job-id: 2018-04-11_12_11_36-1181436984489583563 >>> >>> The same code seems to work correctly i.e. without problems in 2.3.0 >>> >>> @StateId("indexKeys") >>> // this is the state spec needed by beam to figure out the state >>> spec / type requirements at runtime >>> private final StateSpec<CombiningState<KV<String, KV<Long, >>> ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>>> >>> INDEX_KEYS_SPEC = StateSpecs.combining(new IndexStateCombineFn()); >>> >>> The exception is: >>> >>> java.lang.UnsupportedOperationException >>> java.util.AbstractMap.put(AbstractMap.java:209) >>> >>> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22) >>> >>> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11) >>> >>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920) >>> >>> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195) >>> >>> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160) >>> >>> The combine fn is: >>> >>> >>> import com.google.common.collect.Maps; >>> import com.google.protobuf.ByteString; >>> import org.apache.beam.sdk.transforms.Combine; >>> import org.apache.beam.sdk.values.KV; >>> >>> import java.util.Map; >>> >>> // this combiner ensures that we keep track of the most value of each key >>> in the map >>> public class IndexStateCombineFn extends Combine.CombineFn<KV<String, >>> KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, >>> ByteString>> { >>> @Override >>> public Map<String, KV<Long, ByteString>> createAccumulator() { >>> return Maps.newHashMap(); >>> } >>> >>> @Override >>> public Map<String, KV<Long, ByteString>> addInput(Map<String, KV<Long, >>> ByteString>> accumulator, KV<String, KV<Long, ByteString>> input) { >>> String id = input.getKey(); >>> KV<Long, ByteString> indexKey = input.getValue(); >>> if (!accumulator.containsKey(id)) { >>> accumulator.put(id, indexKey); >>> } else { >>> KV<Long, ByteString> prevVal = accumulator.get(id); >>> if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) { >>> // input is newer than what we have in the map, store it >>> accumulator.put(id, indexKey); >>> } >>> } >>> return accumulator; >>> } >>> >>> @Override >>> public Map<String, KV<Long, ByteString>> >>> mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>> accumulators) >>> { >>> Map<String, KV<Long, ByteString>> merged = null; >>> for (Map<String, KV<Long, ByteString>> accumulator : accumulators) { >>> if (merged == null) { >>> merged = accumulator; >>> } else { >>> for (Map.Entry<String, KV<Long, ByteString>> entry : >>> accumulator.entrySet()) { >>> String indexId = entry.getKey(); >>> KV<Long, ByteString> v = entry.getValue(); >>> if (!merged.containsKey(indexId)) { >>> merged.put(indexId, v); >>> } else { >>> KV<Long, ByteString> old = merged.get(indexId); >>> if (old.getKey() < v.getKey()) { >>> merged.put(indexId, v); >>> } >>> } >>> } >>> } >>> } >>> return merged; >>> } >>> >>> @Override >>> public Map<String, ByteString> extractOutput(Map<String, KV<Long, >>> ByteString>> accumulator) { >>> Map<String, ByteString> output = >>> Maps.newHashMapWithExpectedSize(accumulator.size()); >>> for (Map.Entry<String, KV<Long, ByteString>> entry : >>> accumulator.entrySet()) { >>> output.put(entry.getKey(), entry.getValue().getValue()); >>> } >>> return output; >>> } >>> } >>> >>> The exception seems to point that WindmillStateInternals may be >>> returning an ImmutableMap but I can’t say for sure. Based on the >>> javadoc for addInput, the accumulator should be mutable. >>> >>> Has anyone else seen this issue? >>> >>> — Ankur Chauhan >>> >>> -- >>> You received this message because you are subscribed to the Google >>> Groups "dataflow-feedback" group. >>> To view this discussion on the web visit >>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com >>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com?utm_medium=email&utm_source=footer> >>> . >>> >> -- >> You received this message because you are subscribed to the Google Groups >> "dataflow-feedback" group. >> To view this discussion on the web visit >> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com >> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com?utm_medium=email&utm_source=footer> >> . >> >
