Hi Robin,

I have the same question here. If we don't implement coder for class Accum
but only add implements Serializable to class Accum, how is the accumulator
coder generated?

-Rui

On Fri, Aug 17, 2018 at 11:50 AM Mahesh Vangala <vangalamahe...@gmail.com>
wrote:

> Thanks, Robin.
> Based on Robin's comment above, I looked into CombineFn test script in
> beam git repo, and implemented getCoder method along the lines in that
> script. (
> https://github.com/vangalamaheshh/my-beam/blob/master/variant-caller/src/main/java/pipelines/variant_caller/AddLines.java
> )
> Do you think, getCoder implementation is not necessary?
> Thanks for your help though.
> Much appreciate!
>
> - Mahesh
>
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Fri, Aug 17, 2018 at 1:01 PM Robin Qiu <robi...@google.com> wrote:
>
>> Hi Mahesh,
>>
>> I think you have the NullPointerException because your Accumulator is not
>> initialized properly.
>>
>> In your createAccumulator() method, you created a Accum object without
>> setting its line field. So later when accum.line got accessed, you got the
>> exception.
>>
>> By initializing the Accum class you should be able to fix this problem.
>> (e.g. String line = "";, instead of only String line;)
>>
>> Hope this helps,
>> Robin
>>
>> On Thu, Aug 16, 2018 at 12:14 PM Rui Wang <ruw...@google.com> wrote:
>>
>>> Sorry I forgot to attach the PR link:
>>> https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342
>>>
>>> -Rui
>>>
>>> On Thu, Aug 16, 2018 at 12:13 PM Rui Wang <ruw...@google.com> wrote:
>>>
>>>> Hi Mahesh,
>>>>
>>>> I think I had the same NPE when I explored self defined combineFn. I
>>>> think your combineFn might still need to define a coder to help Beam run it
>>>> in distributed environment. Beam tries to invoke coder somewhere and then
>>>> throw a NPE as there is no one defined.
>>>>
>>>> Here is a PR I wrote that defined a AccumulatingCombineFn and
>>>> implemented a coder for that for you reference.
>>>>
>>>> -Rui
>>>>
>>>> On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala <
>>>> vangalamahe...@gmail.com> wrote:
>>>>
>>>>> Hello Robin -
>>>>>
>>>>> Thank you so much for your help.
>>>>> I added Serializable to Accum, and I got the following error. (Sorry,
>>>>> for being a pain. I hope once I get past the initial hump ...)
>>>>>
>>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource
>>>>> getEstimatedSizeBytes
>>>>>
>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>>>
>>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split
>>>>>
>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1
>>>>> ms and produced 1 files and 9 bundles
>>>>>
>>>>> Aug 16, 2018 3:00:09 PM
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>>
>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>>> #structuralValue method which does not return true when the encoding of 
>>>>> the
>>>>> elements is equal. Element KV{null,
>>>>> pipelines.variant_caller.AddLines$Accum@52449030}
>>>>>
>>>>> Aug 16, 2018 3:00:09 PM
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>>
>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>>> #structuralValue method which does not return true when the encoding of 
>>>>> the
>>>>> elements is equal. Element KV{null,
>>>>> pipelines.variant_caller.AddLines$Accum@59bb25e2}
>>>>>
>>>>> Aug 16, 2018 3:00:09 PM
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>>
>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>>> #structuralValue method which does not return true when the encoding of 
>>>>> the
>>>>> elements is equal. Element KV{null,
>>>>> pipelines.variant_caller.AddLines$Accum@7076d18e}
>>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>> java.lang.NullPointerException
>>>>>
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>> DirectRunner.java:332)
>>>>>
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>> DirectRunner.java:302)
>>>>>
>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>> DirectRunner.java:197)
>>>>>
>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>> DirectRunner.java:64)
>>>>>
>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>>>
>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>>>
>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>>>>
>>>>> Caused by: java.lang.NullPointerException
>>>>>
>>>>> at pipelines.variant_caller.AddLines.mergeAccumulators(
>>>>> AddLines.java:35)
>>>>>
>>>>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1
>>>>> )
>>>>>
>>>>> *--*
>>>>> *Mahesh Vangala*
>>>>> *(Ph) 443-326-1957*
>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>
>>>>>
>>>>> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote:
>>>>>
>>>>>> Hello Mahesh,
>>>>>>
>>>>>> You can add "implements Serializable" to the Accum class, then it
>>>>>> should work.
>>>>>>
>>>>>> By the way, in Java String is immutable, so in order to change, for
>>>>>> example, accum.line, you need to write accum.line = 
>>>>>> accum.line.concat(line).
>>>>>>
>>>>>> Best,
>>>>>> Robin
>>>>>>
>>>>>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <
>>>>>> vangalamahe...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello all -
>>>>>>>
>>>>>>> I am trying to run a barebone beam pipeline to understand the
>>>>>>> "combine" logic. I am from python world trying to learn java beam sdk 
>>>>>>> due
>>>>>>> to my use case of ETL with spark cluster. So, pardon me for my grotesque
>>>>>>> java code :)
>>>>>>>
>>>>>>> I appreciate if you could nudge me in the right path with this
>>>>>>> error: (please see below)
>>>>>>>
>>>>>>> Here's my code: (read lines from input file and output the same
>>>>>>> lines to outfile)
>>>>>>>
>>>>>>> public class VariantCaller
>>>>>>>
>>>>>>> {
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>     public static void main( String[] args )
>>>>>>>
>>>>>>>     {
>>>>>>>
>>>>>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>>>>> ).create();
>>>>>>>
>>>>>>>         Pipeline p = Pipeline.create(opts);
>>>>>>>
>>>>>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>>>>>> "test_in.csv"));
>>>>>>>
>>>>>>>         PCollection<String> mergedLines = lines
>>>>>>> .apply(Combine.globally(new AddLines()))
>>>>>>>
>>>>>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>>>>
>>>>>>>         p.run();
>>>>>>>
>>>>>>>     }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> AddLines Class:
>>>>>>>
>>>>>>>
>>>>>>> public class AddLines extends CombineFn<String, AddLines.Accum,
>>>>>>> String> {
>>>>>>>
>>>>>>>   /**
>>>>>>>
>>>>>>>    *
>>>>>>>
>>>>>>>    */
>>>>>>>
>>>>>>>   private static final long serialVersionUID = 1L;
>>>>>>>
>>>>>>>
>>>>>>>   public static class Accum {
>>>>>>>
>>>>>>>     String line;
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>
>>>>>>>   @Override
>>>>>>>
>>>>>>>   public Accum createAccumulator() { return new Accum(); }
>>>>>>>
>>>>>>>
>>>>>>>   @Override
>>>>>>>
>>>>>>>   public Accum addInput(Accum accum, String line) {
>>>>>>>
>>>>>>>       accum.line.concat(line);
>>>>>>>
>>>>>>>       return accum;
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>   @Override
>>>>>>>
>>>>>>>   public Accum mergeAccumulators(Iterable<Accum> accums) {
>>>>>>>
>>>>>>>     Accum merged = createAccumulator();
>>>>>>>
>>>>>>>     for (Accum accum : accums) {
>>>>>>>
>>>>>>>       merged.line.concat("\n").concat(accum.line);
>>>>>>>
>>>>>>>     }
>>>>>>>
>>>>>>>     return merged;
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>   @Override
>>>>>>>
>>>>>>>   public String extractOutput(Accum accum) {
>>>>>>>
>>>>>>>     return accum.line;
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> Exception in thread "main" java.lang.IllegalStateException: Unable
>>>>>>> to return a default Coder for
>>>>>>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output
>>>>>>> [PCollection]. Correct one of the following root causes:
>>>>>>>
>>>>>>>   No Coder has been manually specified;  you may do so using
>>>>>>> .setCoder().
>>>>>>>
>>>>>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>>>>>>> coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>:
>>>>>>> Unable to provide a Coder for K.
>>>>>>>
>>>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>>>
>>>>>>>   See suppressed exceptions for detailed failures.
>>>>>>>
>>>>>>>   Using the default output Coder from the producing PTransform
>>>>>>> failed: PTransform.getOutputCoder called.
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(
>>>>>>> Preconditions.java:444)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.values.PCollection.getCoder(
>>>>>>> PCollection.java:277)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.values.PCollection.finishSpecifying(
>>>>>>> PCollection.java:114)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
>>>>>>> TransformHierarchy.java:190)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325
>>>>>>> )
>>>>>>>
>>>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>>>>>> Combine.java:1074)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>>>>>> Combine.java:943)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325
>>>>>>> )
>>>>>>>
>>>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27
>>>>>>> )
>>>>>>>
>>>>>>> *--*
>>>>>>> *Mahesh Vangala*
>>>>>>> *(Ph) 443-326-1957*
>>>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>>>
>>>>>>

Reply via email to