Hi Robin, I have the same question here. If we don't implement coder for class Accum but only add implements Serializable to class Accum, how is the accumulator coder generated?
-Rui On Fri, Aug 17, 2018 at 11:50 AM Mahesh Vangala <vangalamahe...@gmail.com> wrote: > Thanks, Robin. > Based on Robin's comment above, I looked into CombineFn test script in > beam git repo, and implemented getCoder method along the lines in that > script. ( > https://github.com/vangalamaheshh/my-beam/blob/master/variant-caller/src/main/java/pipelines/variant_caller/AddLines.java > ) > Do you think, getCoder implementation is not necessary? > Thanks for your help though. > Much appreciate! > > - Mahesh > > > *--* > *Mahesh Vangala* > *(Ph) 443-326-1957* > *(web) mvangala.com <http://mvangala.com>* > > > On Fri, Aug 17, 2018 at 1:01 PM Robin Qiu <robi...@google.com> wrote: > >> Hi Mahesh, >> >> I think you have the NullPointerException because your Accumulator is not >> initialized properly. >> >> In your createAccumulator() method, you created a Accum object without >> setting its line field. So later when accum.line got accessed, you got the >> exception. >> >> By initializing the Accum class you should be able to fix this problem. >> (e.g. String line = "";, instead of only String line;) >> >> Hope this helps, >> Robin >> >> On Thu, Aug 16, 2018 at 12:14 PM Rui Wang <ruw...@google.com> wrote: >> >>> Sorry I forgot to attach the PR link: >>> https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342 >>> >>> -Rui >>> >>> On Thu, Aug 16, 2018 at 12:13 PM Rui Wang <ruw...@google.com> wrote: >>> >>>> Hi Mahesh, >>>> >>>> I think I had the same NPE when I explored self defined combineFn. I >>>> think your combineFn might still need to define a coder to help Beam run it >>>> in distributed environment. Beam tries to invoke coder somewhere and then >>>> throw a NPE as there is no one defined. >>>> >>>> Here is a PR I wrote that defined a AccumulatingCombineFn and >>>> implemented a coder for that for you reference. >>>> >>>> -Rui >>>> >>>> On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala < >>>> vangalamahe...@gmail.com> wrote: >>>> >>>>> Hello Robin - >>>>> >>>>> Thank you so much for your help. >>>>> I added Serializable to Accum, and I got the following error. (Sorry, >>>>> for being a pain. I hope once I get past the initial hump ...) >>>>> >>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource >>>>> getEstimatedSizeBytes >>>>> >>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36 >>>>> >>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split >>>>> >>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 >>>>> ms and produced 1 files and 9 bundles >>>>> >>>>> Aug 16, 2018 3:00:09 PM >>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >>>>> verifyUnmodifiedThrowingCheckedExceptions >>>>> >>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >>>>> #structuralValue method which does not return true when the encoding of >>>>> the >>>>> elements is equal. Element KV{null, >>>>> pipelines.variant_caller.AddLines$Accum@52449030} >>>>> >>>>> Aug 16, 2018 3:00:09 PM >>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >>>>> verifyUnmodifiedThrowingCheckedExceptions >>>>> >>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >>>>> #structuralValue method which does not return true when the encoding of >>>>> the >>>>> elements is equal. Element KV{null, >>>>> pipelines.variant_caller.AddLines$Accum@59bb25e2} >>>>> >>>>> Aug 16, 2018 3:00:09 PM >>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >>>>> verifyUnmodifiedThrowingCheckedExceptions >>>>> >>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >>>>> #structuralValue method which does not return true when the encoding of >>>>> the >>>>> elements is equal. Element KV{null, >>>>> pipelines.variant_caller.AddLines$Accum@7076d18e} >>>>> >>>>> Exception in thread "main" >>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>>> java.lang.NullPointerException >>>>> >>>>> at >>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>>> DirectRunner.java:332) >>>>> >>>>> at >>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>>> DirectRunner.java:302) >>>>> >>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>> DirectRunner.java:197) >>>>> >>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>> DirectRunner.java:64) >>>>> >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >>>>> >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >>>>> >>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) >>>>> >>>>> Caused by: java.lang.NullPointerException >>>>> >>>>> at pipelines.variant_caller.AddLines.mergeAccumulators( >>>>> AddLines.java:35) >>>>> >>>>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1 >>>>> ) >>>>> >>>>> *--* >>>>> *Mahesh Vangala* >>>>> *(Ph) 443-326-1957* >>>>> *(web) mvangala.com <http://mvangala.com>* >>>>> >>>>> >>>>> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote: >>>>> >>>>>> Hello Mahesh, >>>>>> >>>>>> You can add "implements Serializable" to the Accum class, then it >>>>>> should work. >>>>>> >>>>>> By the way, in Java String is immutable, so in order to change, for >>>>>> example, accum.line, you need to write accum.line = >>>>>> accum.line.concat(line). >>>>>> >>>>>> Best, >>>>>> Robin >>>>>> >>>>>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala < >>>>>> vangalamahe...@gmail.com> wrote: >>>>>> >>>>>>> Hello all - >>>>>>> >>>>>>> I am trying to run a barebone beam pipeline to understand the >>>>>>> "combine" logic. I am from python world trying to learn java beam sdk >>>>>>> due >>>>>>> to my use case of ETL with spark cluster. So, pardon me for my grotesque >>>>>>> java code :) >>>>>>> >>>>>>> I appreciate if you could nudge me in the right path with this >>>>>>> error: (please see below) >>>>>>> >>>>>>> Here's my code: (read lines from input file and output the same >>>>>>> lines to outfile) >>>>>>> >>>>>>> public class VariantCaller >>>>>>> >>>>>>> { >>>>>>> >>>>>>> >>>>>>> >>>>>>> public static void main( String[] args ) >>>>>>> >>>>>>> { >>>>>>> >>>>>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >>>>>>> ).create(); >>>>>>> >>>>>>> Pipeline p = Pipeline.create(opts); >>>>>>> >>>>>>> PCollection<String> lines = p.apply(TextIO.read().from( >>>>>>> "test_in.csv")); >>>>>>> >>>>>>> PCollection<String> mergedLines = lines >>>>>>> .apply(Combine.globally(new AddLines())) >>>>>>> >>>>>>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>>>>>> >>>>>>> p.run(); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> AddLines Class: >>>>>>> >>>>>>> >>>>>>> public class AddLines extends CombineFn<String, AddLines.Accum, >>>>>>> String> { >>>>>>> >>>>>>> /** >>>>>>> >>>>>>> * >>>>>>> >>>>>>> */ >>>>>>> >>>>>>> private static final long serialVersionUID = 1L; >>>>>>> >>>>>>> >>>>>>> public static class Accum { >>>>>>> >>>>>>> String line; >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> @Override >>>>>>> >>>>>>> public Accum createAccumulator() { return new Accum(); } >>>>>>> >>>>>>> >>>>>>> @Override >>>>>>> >>>>>>> public Accum addInput(Accum accum, String line) { >>>>>>> >>>>>>> accum.line.concat(line); >>>>>>> >>>>>>> return accum; >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> @Override >>>>>>> >>>>>>> public Accum mergeAccumulators(Iterable<Accum> accums) { >>>>>>> >>>>>>> Accum merged = createAccumulator(); >>>>>>> >>>>>>> for (Accum accum : accums) { >>>>>>> >>>>>>> merged.line.concat("\n").concat(accum.line); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> return merged; >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> @Override >>>>>>> >>>>>>> public String extractOutput(Accum accum) { >>>>>>> >>>>>>> return accum.line; >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> Exception in thread "main" java.lang.IllegalStateException: Unable >>>>>>> to return a default Coder for >>>>>>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output >>>>>>> [PCollection]. Correct one of the following root causes: >>>>>>> >>>>>>> No Coder has been manually specified; you may do so using >>>>>>> .setCoder(). >>>>>>> >>>>>>> Inferring a Coder from the CoderRegistry failed: Cannot provide >>>>>>> coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: >>>>>>> Unable to provide a Coder for K. >>>>>>> >>>>>>> Building a Coder using a registered CoderProvider failed. >>>>>>> >>>>>>> See suppressed exceptions for detailed failures. >>>>>>> >>>>>>> Using the default output Coder from the producing PTransform >>>>>>> failed: PTransform.getOutputCoder called. >>>>>>> >>>>>>> at >>>>>>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState( >>>>>>> Preconditions.java:444) >>>>>>> >>>>>>> at org.apache.beam.sdk.values.PCollection.getCoder( >>>>>>> PCollection.java:277) >>>>>>> >>>>>>> at org.apache.beam.sdk.values.PCollection.finishSpecifying( >>>>>>> PCollection.java:114) >>>>>>> >>>>>>> at >>>>>>> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput( >>>>>>> TransformHierarchy.java:190) >>>>>>> >>>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536) >>>>>>> >>>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >>>>>>> >>>>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325 >>>>>>> ) >>>>>>> >>>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >>>>>>> Combine.java:1074) >>>>>>> >>>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >>>>>>> Combine.java:943) >>>>>>> >>>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) >>>>>>> >>>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >>>>>>> >>>>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325 >>>>>>> ) >>>>>>> >>>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27 >>>>>>> ) >>>>>>> >>>>>>> *--* >>>>>>> *Mahesh Vangala* >>>>>>> *(Ph) 443-326-1957* >>>>>>> *(web) mvangala.com <http://mvangala.com>* >>>>>>> >>>>>>