Sorry I forgot to attach the PR link: https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342
-Rui On Thu, Aug 16, 2018 at 12:13 PM Rui Wang <ruw...@google.com> wrote: > Hi Mahesh, > > I think I had the same NPE when I explored self defined combineFn. I think > your combineFn might still need to define a coder to help Beam run it in > distributed environment. Beam tries to invoke coder somewhere and then > throw a NPE as there is no one defined. > > Here is a PR I wrote that defined a AccumulatingCombineFn and implemented > a coder for that for you reference. > > -Rui > > On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala <vangalamahe...@gmail.com> > wrote: > >> Hello Robin - >> >> Thank you so much for your help. >> I added Serializable to Accum, and I got the following error. (Sorry, for >> being a pain. I hope once I get past the initial hump ...) >> >> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource >> getEstimatedSizeBytes >> >> INFO: Filepattern test_in.csv matched 1 files with total size 36 >> >> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split >> >> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms >> and produced 1 files and 9 bundles >> >> Aug 16, 2018 3:00:09 PM >> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >> verifyUnmodifiedThrowingCheckedExceptions >> >> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >> #structuralValue method which does not return true when the encoding of the >> elements is equal. Element KV{null, >> pipelines.variant_caller.AddLines$Accum@52449030} >> >> Aug 16, 2018 3:00:09 PM >> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >> verifyUnmodifiedThrowingCheckedExceptions >> >> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >> #structuralValue method which does not return true when the encoding of the >> elements is equal. Element KV{null, >> pipelines.variant_caller.AddLines$Accum@59bb25e2} >> >> Aug 16, 2018 3:00:09 PM >> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector >> verifyUnmodifiedThrowingCheckedExceptions >> >> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a >> #structuralValue method which does not return true when the encoding of the >> elements is equal. Element KV{null, >> pipelines.variant_caller.AddLines$Accum@7076d18e} >> >> Exception in thread "main" >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> java.lang.NullPointerException >> >> at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >> DirectRunner.java:332) >> >> at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >> DirectRunner.java:302) >> >> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) >> >> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >> >> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) >> >> Caused by: java.lang.NullPointerException >> >> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35) >> >> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1) >> >> *--* >> *Mahesh Vangala* >> *(Ph) 443-326-1957* >> *(web) mvangala.com <http://mvangala.com>* >> >> >> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote: >> >>> Hello Mahesh, >>> >>> You can add "implements Serializable" to the Accum class, then it should >>> work. >>> >>> By the way, in Java String is immutable, so in order to change, for >>> example, accum.line, you need to write accum.line = accum.line.concat(line). >>> >>> Best, >>> Robin >>> >>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala < >>> vangalamahe...@gmail.com> wrote: >>> >>>> Hello all - >>>> >>>> I am trying to run a barebone beam pipeline to understand the "combine" >>>> logic. I am from python world trying to learn java beam sdk due to my use >>>> case of ETL with spark cluster. So, pardon me for my grotesque java code :) >>>> >>>> I appreciate if you could nudge me in the right path with this error: >>>> (please see below) >>>> >>>> Here's my code: (read lines from input file and output the same lines >>>> to outfile) >>>> >>>> public class VariantCaller >>>> >>>> { >>>> >>>> >>>> >>>> public static void main( String[] args ) >>>> >>>> { >>>> >>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >>>> ).create(); >>>> >>>> Pipeline p = Pipeline.create(opts); >>>> >>>> PCollection<String> lines = p.apply(TextIO.read().from( >>>> "test_in.csv")); >>>> >>>> PCollection<String> mergedLines = lines.apply(Combine.globally( >>>> new AddLines())) >>>> >>>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>>> >>>> p.run(); >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> AddLines Class: >>>> >>>> >>>> public class AddLines extends CombineFn<String, AddLines.Accum, >>>> String> { >>>> >>>> /** >>>> >>>> * >>>> >>>> */ >>>> >>>> private static final long serialVersionUID = 1L; >>>> >>>> >>>> public static class Accum { >>>> >>>> String line; >>>> >>>> } >>>> >>>> >>>> @Override >>>> >>>> public Accum createAccumulator() { return new Accum(); } >>>> >>>> >>>> @Override >>>> >>>> public Accum addInput(Accum accum, String line) { >>>> >>>> accum.line.concat(line); >>>> >>>> return accum; >>>> >>>> } >>>> >>>> >>>> >>>> @Override >>>> >>>> public Accum mergeAccumulators(Iterable<Accum> accums) { >>>> >>>> Accum merged = createAccumulator(); >>>> >>>> for (Accum accum : accums) { >>>> >>>> merged.line.concat("\n").concat(accum.line); >>>> >>>> } >>>> >>>> return merged; >>>> >>>> } >>>> >>>> >>>> >>>> @Override >>>> >>>> public String extractOutput(Accum accum) { >>>> >>>> return accum.line; >>>> >>>> } >>>> >>>> >>>> } >>>> >>>> >>>> Exception in thread "main" java.lang.IllegalStateException: Unable to >>>> return a default Coder for >>>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output >>>> [PCollection]. Correct one of the following root causes: >>>> >>>> No Coder has been manually specified; you may do so using >>>> .setCoder(). >>>> >>>> Inferring a Coder from the CoderRegistry failed: Cannot provide >>>> coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: >>>> Unable to provide a Coder for K. >>>> >>>> Building a Coder using a registered CoderProvider failed. >>>> >>>> See suppressed exceptions for detailed failures. >>>> >>>> Using the default output Coder from the producing PTransform failed: >>>> PTransform.getOutputCoder called. >>>> >>>> at >>>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState( >>>> Preconditions.java:444) >>>> >>>> at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:277 >>>> ) >>>> >>>> at org.apache.beam.sdk.values.PCollection.finishSpecifying( >>>> PCollection.java:114) >>>> >>>> at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput( >>>> TransformHierarchy.java:190) >>>> >>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536) >>>> >>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >>>> >>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) >>>> >>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >>>> Combine.java:1074) >>>> >>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand( >>>> Combine.java:943) >>>> >>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) >>>> >>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) >>>> >>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) >>>> >>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27) >>>> >>>> *--* >>>> *Mahesh Vangala* >>>> *(Ph) 443-326-1957* >>>> *(web) mvangala.com <http://mvangala.com>* >>>> >>>