Hi Mahesh,

I think you have the NullPointerException because your Accumulator is not
initialized properly.

In your createAccumulator() method, you created a Accum object without
setting its line field. So later when accum.line got accessed, you got the
exception.

By initializing the Accum class you should be able to fix this problem.
(e.g. String line = "";, instead of only String line;)

Hope this helps,
Robin

On Thu, Aug 16, 2018 at 12:14 PM Rui Wang <ruw...@google.com> wrote:

> Sorry I forgot to attach the PR link:
> https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342
>
> -Rui
>
> On Thu, Aug 16, 2018 at 12:13 PM Rui Wang <ruw...@google.com> wrote:
>
>> Hi Mahesh,
>>
>> I think I had the same NPE when I explored self defined combineFn. I
>> think your combineFn might still need to define a coder to help Beam run it
>> in distributed environment. Beam tries to invoke coder somewhere and then
>> throw a NPE as there is no one defined.
>>
>> Here is a PR I wrote that defined a AccumulatingCombineFn and implemented
>> a coder for that for you reference.
>>
>> -Rui
>>
>> On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala <vangalamahe...@gmail.com>
>> wrote:
>>
>>> Hello Robin -
>>>
>>> Thank you so much for your help.
>>> I added Serializable to Accum, and I got the following error. (Sorry,
>>> for being a pain. I hope once I get past the initial hump ...)
>>>
>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource
>>> getEstimatedSizeBytes
>>>
>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>
>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split
>>>
>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
>>> and produced 1 files and 9 bundles
>>>
>>> Aug 16, 2018 3:00:09 PM
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>> verifyUnmodifiedThrowingCheckedExceptions
>>>
>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>> #structuralValue method which does not return true when the encoding of the
>>> elements is equal. Element KV{null,
>>> pipelines.variant_caller.AddLines$Accum@52449030}
>>>
>>> Aug 16, 2018 3:00:09 PM
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>> verifyUnmodifiedThrowingCheckedExceptions
>>>
>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>> #structuralValue method which does not return true when the encoding of the
>>> elements is equal. Element KV{null,
>>> pipelines.variant_caller.AddLines$Accum@59bb25e2}
>>>
>>> Aug 16, 2018 3:00:09 PM
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>> verifyUnmodifiedThrowingCheckedExceptions
>>>
>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>> #structuralValue method which does not return true when the encoding of the
>>> elements is equal. Element KV{null,
>>> pipelines.variant_caller.AddLines$Accum@7076d18e}
>>>
>>> Exception in thread "main"
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> java.lang.NullPointerException
>>>
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>> DirectRunner.java:332)
>>>
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>> DirectRunner.java:302)
>>>
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197
>>> )
>>>
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>>>
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>
>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>>
>>> Caused by: java.lang.NullPointerException
>>>
>>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35)
>>>
>>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1)
>>>
>>> *--*
>>> *Mahesh Vangala*
>>> *(Ph) 443-326-1957*
>>> *(web) mvangala.com <http://mvangala.com>*
>>>
>>>
>>> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote:
>>>
>>>> Hello Mahesh,
>>>>
>>>> You can add "implements Serializable" to the Accum class, then it
>>>> should work.
>>>>
>>>> By the way, in Java String is immutable, so in order to change, for
>>>> example, accum.line, you need to write accum.line = 
>>>> accum.line.concat(line).
>>>>
>>>> Best,
>>>> Robin
>>>>
>>>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <
>>>> vangalamahe...@gmail.com> wrote:
>>>>
>>>>> Hello all -
>>>>>
>>>>> I am trying to run a barebone beam pipeline to understand the
>>>>> "combine" logic. I am from python world trying to learn java beam sdk due
>>>>> to my use case of ETL with spark cluster. So, pardon me for my grotesque
>>>>> java code :)
>>>>>
>>>>> I appreciate if you could nudge me in the right path with this error:
>>>>> (please see below)
>>>>>
>>>>> Here's my code: (read lines from input file and output the same lines
>>>>> to outfile)
>>>>>
>>>>> public class VariantCaller
>>>>>
>>>>> {
>>>>>
>>>>>
>>>>>
>>>>>     public static void main( String[] args )
>>>>>
>>>>>     {
>>>>>
>>>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>>> ).create();
>>>>>
>>>>>         Pipeline p = Pipeline.create(opts);
>>>>>
>>>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>>>> "test_in.csv"));
>>>>>
>>>>>         PCollection<String> mergedLines = lines
>>>>> .apply(Combine.globally(new AddLines()))
>>>>>
>>>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>>
>>>>>         p.run();
>>>>>
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> AddLines Class:
>>>>>
>>>>>
>>>>> public class AddLines extends CombineFn<String, AddLines.Accum,
>>>>> String> {
>>>>>
>>>>>   /**
>>>>>
>>>>>    *
>>>>>
>>>>>    */
>>>>>
>>>>>   private static final long serialVersionUID = 1L;
>>>>>
>>>>>
>>>>>   public static class Accum {
>>>>>
>>>>>     String line;
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>>   @Override
>>>>>
>>>>>   public Accum createAccumulator() { return new Accum(); }
>>>>>
>>>>>
>>>>>   @Override
>>>>>
>>>>>   public Accum addInput(Accum accum, String line) {
>>>>>
>>>>>       accum.line.concat(line);
>>>>>
>>>>>       return accum;
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>>
>>>>>   @Override
>>>>>
>>>>>   public Accum mergeAccumulators(Iterable<Accum> accums) {
>>>>>
>>>>>     Accum merged = createAccumulator();
>>>>>
>>>>>     for (Accum accum : accums) {
>>>>>
>>>>>       merged.line.concat("\n").concat(accum.line);
>>>>>
>>>>>     }
>>>>>
>>>>>     return merged;
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>>
>>>>>   @Override
>>>>>
>>>>>   public String extractOutput(Accum accum) {
>>>>>
>>>>>     return accum.line;
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> Exception in thread "main" java.lang.IllegalStateException: Unable to
>>>>> return a default Coder for
>>>>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output
>>>>> [PCollection]. Correct one of the following root causes:
>>>>>
>>>>>   No Coder has been manually specified;  you may do so using
>>>>> .setCoder().
>>>>>
>>>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>>>>> coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>:
>>>>> Unable to provide a Coder for K.
>>>>>
>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>
>>>>>   See suppressed exceptions for detailed failures.
>>>>>
>>>>>   Using the default output Coder from the producing PTransform
>>>>> failed: PTransform.getOutputCoder called.
>>>>>
>>>>> at
>>>>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(
>>>>> Preconditions.java:444)
>>>>>
>>>>> at org.apache.beam.sdk.values.PCollection.getCoder(
>>>>> PCollection.java:277)
>>>>>
>>>>> at org.apache.beam.sdk.values.PCollection.finishSpecifying(
>>>>> PCollection.java:114)
>>>>>
>>>>> at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
>>>>> TransformHierarchy.java:190)
>>>>>
>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
>>>>>
>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>>>
>>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>>>>>
>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>>>> Combine.java:1074)
>>>>>
>>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>>>> Combine.java:943)
>>>>>
>>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>>>>
>>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>>>
>>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>>>>>
>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27)
>>>>>
>>>>> *--*
>>>>> *Mahesh Vangala*
>>>>> *(Ph) 443-326-1957*
>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>
>>>>

Reply via email to