There's actually been a rename since that notion of CombiningState. It used to be BagState (just blind writes) and CombiningValueState (uses a CombineFn) were both instances of CombiningState (any automatically mergeable thing).
Now the names are BagState (blind writes) and CombiningState (uses a CombineFn) which are instances of GroupingState (automatically mergeable - you might wonder why we didn't call it MergeableState...) Kenn On Mon, Apr 16, 2018 at 12:14 PM Reuven Lax <[email protected]> wrote: > Out of curiosity, what are you using CombiningState for? I believe it is > intended for use in merging windows (such as sessions), however those > windows are not yet supported with state. > > Reuven > > On Fri, Apr 13, 2018 at 2:42 AM Ankur Chauhan <[email protected]> > wrote: > >> Hi all, >> >> I recently updated my dataflow pipeline to 2.4.0 sdk and found that my >> stateful DoFn with the following statespec is throwing >> java.lang.UnsupportedOperationException. >> >> For reference the job information is: >> >> - job-id: 2018-04-11_12_11_36-1181436984489583563 >> >> The same code seems to work correctly i.e. without problems in 2.3.0 >> >> @StateId("indexKeys") >> // this is the state spec needed by beam to figure out the state >> spec / type requirements at runtime >> private final StateSpec<CombiningState<KV<String, KV<Long, >> ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>>> >> INDEX_KEYS_SPEC = StateSpecs.combining(new IndexStateCombineFn()); >> >> The exception is: >> >> java.lang.UnsupportedOperationException >> java.util.AbstractMap.put(AbstractMap.java:209) >> >> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22) >> >> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11) >> >> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920) >> >> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195) >> >> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160) >> >> The combine fn is: >> >> >> import com.google.common.collect.Maps; >> import com.google.protobuf.ByteString; >> import org.apache.beam.sdk.transforms.Combine; >> import org.apache.beam.sdk.values.KV; >> >> import java.util.Map; >> >> // this combiner ensures that we keep track of the most value of each key in >> the map >> public class IndexStateCombineFn extends Combine.CombineFn<KV<String, >> KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, >> ByteString>> { >> @Override >> public Map<String, KV<Long, ByteString>> createAccumulator() { >> return Maps.newHashMap(); >> } >> >> @Override >> public Map<String, KV<Long, ByteString>> addInput(Map<String, KV<Long, >> ByteString>> accumulator, KV<String, KV<Long, ByteString>> input) { >> String id = input.getKey(); >> KV<Long, ByteString> indexKey = input.getValue(); >> if (!accumulator.containsKey(id)) { >> accumulator.put(id, indexKey); >> } else { >> KV<Long, ByteString> prevVal = accumulator.get(id); >> if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) { >> // input is newer than what we have in the map, store it >> accumulator.put(id, indexKey); >> } >> } >> return accumulator; >> } >> >> @Override >> public Map<String, KV<Long, ByteString>> >> mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>> accumulators) { >> Map<String, KV<Long, ByteString>> merged = null; >> for (Map<String, KV<Long, ByteString>> accumulator : accumulators) { >> if (merged == null) { >> merged = accumulator; >> } else { >> for (Map.Entry<String, KV<Long, ByteString>> entry : >> accumulator.entrySet()) { >> String indexId = entry.getKey(); >> KV<Long, ByteString> v = entry.getValue(); >> if (!merged.containsKey(indexId)) { >> merged.put(indexId, v); >> } else { >> KV<Long, ByteString> old = merged.get(indexId); >> if (old.getKey() < v.getKey()) { >> merged.put(indexId, v); >> } >> } >> } >> } >> } >> return merged; >> } >> >> @Override >> public Map<String, ByteString> extractOutput(Map<String, KV<Long, >> ByteString>> accumulator) { >> Map<String, ByteString> output = >> Maps.newHashMapWithExpectedSize(accumulator.size()); >> for (Map.Entry<String, KV<Long, ByteString>> entry : >> accumulator.entrySet()) { >> output.put(entry.getKey(), entry.getValue().getValue()); >> } >> return output; >> } >> } >> >> The exception seems to point that WindmillStateInternals may be >> returning an ImmutableMap but I can’t say for sure. Based on the javadoc >> for addInput, the accumulator should be mutable. >> >> Has anyone else seen this issue? >> >> — Ankur Chauhan >> >> -- >> You received this message because you are subscribed to the Google Groups >> "dataflow-feedback" group. >> To view this discussion on the web visit >> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com >> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com?utm_medium=email&utm_source=footer> >> . >> > -- > You received this message because you are subscribed to the Google Groups > "dataflow-feedback" group. > To view this discussion on the web visit > https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com > <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com?utm_medium=email&utm_source=footer> > . >
