Sorry I forgot to attach the PR link:
https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342

-Rui

On Thu, Aug 16, 2018 at 12:13 PM Rui Wang <ruw...@google.com> wrote:

> Hi Mahesh,
>
> I think I had the same NPE when I explored self defined combineFn. I think
> your combineFn might still need to define a coder to help Beam run it in
> distributed environment. Beam tries to invoke coder somewhere and then
> throw a NPE as there is no one defined.
>
> Here is a PR I wrote that defined a AccumulatingCombineFn and implemented
> a coder for that for you reference.
>
> -Rui
>
> On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala <vangalamahe...@gmail.com>
> wrote:
>
>> Hello Robin -
>>
>> Thank you so much for your help.
>> I added Serializable to Accum, and I got the following error. (Sorry, for
>> being a pain. I hope once I get past the initial hump ...)
>>
>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource
>> getEstimatedSizeBytes
>>
>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>
>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split
>>
>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
>> and produced 1 files and 9 bundles
>>
>> Aug 16, 2018 3:00:09 PM
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>> verifyUnmodifiedThrowingCheckedExceptions
>>
>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>> #structuralValue method which does not return true when the encoding of the
>> elements is equal. Element KV{null,
>> pipelines.variant_caller.AddLines$Accum@52449030}
>>
>> Aug 16, 2018 3:00:09 PM
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>> verifyUnmodifiedThrowingCheckedExceptions
>>
>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>> #structuralValue method which does not return true when the encoding of the
>> elements is equal. Element KV{null,
>> pipelines.variant_caller.AddLines$Accum@59bb25e2}
>>
>> Aug 16, 2018 3:00:09 PM
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>> verifyUnmodifiedThrowingCheckedExceptions
>>
>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>> #structuralValue method which does not return true when the encoding of the
>> elements is equal. Element KV{null,
>> pipelines.variant_caller.AddLines$Accum@7076d18e}
>>
>> Exception in thread "main"
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.NullPointerException
>>
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>> DirectRunner.java:332)
>>
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>> DirectRunner.java:302)
>>
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
>>
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>>
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>
>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>
>> Caused by: java.lang.NullPointerException
>>
>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35)
>>
>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1)
>>
>> *--*
>> *Mahesh Vangala*
>> *(Ph) 443-326-1957*
>> *(web) mvangala.com <http://mvangala.com>*
>>
>>
>> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu <robi...@google.com> wrote:
>>
>>> Hello Mahesh,
>>>
>>> You can add "implements Serializable" to the Accum class, then it should
>>> work.
>>>
>>> By the way, in Java String is immutable, so in order to change, for
>>> example, accum.line, you need to write accum.line = accum.line.concat(line).
>>>
>>> Best,
>>> Robin
>>>
>>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <
>>> vangalamahe...@gmail.com> wrote:
>>>
>>>> Hello all -
>>>>
>>>> I am trying to run a barebone beam pipeline to understand the "combine"
>>>> logic. I am from python world trying to learn java beam sdk due to my use
>>>> case of ETL with spark cluster. So, pardon me for my grotesque java code :)
>>>>
>>>> I appreciate if you could nudge me in the right path with this error:
>>>> (please see below)
>>>>
>>>> Here's my code: (read lines from input file and output the same lines
>>>> to outfile)
>>>>
>>>> public class VariantCaller
>>>>
>>>> {
>>>>
>>>>
>>>>
>>>>     public static void main( String[] args )
>>>>
>>>>     {
>>>>
>>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>> ).create();
>>>>
>>>>         Pipeline p = Pipeline.create(opts);
>>>>
>>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>>> "test_in.csv"));
>>>>
>>>>         PCollection<String> mergedLines = lines.apply(Combine.globally(
>>>> new AddLines()))
>>>>
>>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>
>>>>         p.run();
>>>>
>>>>     }
>>>>
>>>> }
>>>>
>>>>
>>>> AddLines Class:
>>>>
>>>>
>>>> public class AddLines extends CombineFn<String, AddLines.Accum,
>>>> String> {
>>>>
>>>>   /**
>>>>
>>>>    *
>>>>
>>>>    */
>>>>
>>>>   private static final long serialVersionUID = 1L;
>>>>
>>>>
>>>>   public static class Accum {
>>>>
>>>>     String line;
>>>>
>>>>   }
>>>>
>>>>
>>>>   @Override
>>>>
>>>>   public Accum createAccumulator() { return new Accum(); }
>>>>
>>>>
>>>>   @Override
>>>>
>>>>   public Accum addInput(Accum accum, String line) {
>>>>
>>>>       accum.line.concat(line);
>>>>
>>>>       return accum;
>>>>
>>>>   }
>>>>
>>>>
>>>>
>>>>   @Override
>>>>
>>>>   public Accum mergeAccumulators(Iterable<Accum> accums) {
>>>>
>>>>     Accum merged = createAccumulator();
>>>>
>>>>     for (Accum accum : accums) {
>>>>
>>>>       merged.line.concat("\n").concat(accum.line);
>>>>
>>>>     }
>>>>
>>>>     return merged;
>>>>
>>>>   }
>>>>
>>>>
>>>>
>>>>   @Override
>>>>
>>>>   public String extractOutput(Accum accum) {
>>>>
>>>>     return accum.line;
>>>>
>>>>   }
>>>>
>>>>
>>>> }
>>>>
>>>>
>>>> Exception in thread "main" java.lang.IllegalStateException: Unable to
>>>> return a default Coder for
>>>> Combine.globally(AddLines)/Combine.perKey(AddLines)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output
>>>> [PCollection]. Correct one of the following root causes:
>>>>
>>>>   No Coder has been manually specified;  you may do so using
>>>> .setCoder().
>>>>
>>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>>>> coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>:
>>>> Unable to provide a Coder for K.
>>>>
>>>>   Building a Coder using a registered CoderProvider failed.
>>>>
>>>>   See suppressed exceptions for detailed failures.
>>>>
>>>>   Using the default output Coder from the producing PTransform failed:
>>>> PTransform.getOutputCoder called.
>>>>
>>>> at
>>>> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(
>>>> Preconditions.java:444)
>>>>
>>>> at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:277
>>>> )
>>>>
>>>> at org.apache.beam.sdk.values.PCollection.finishSpecifying(
>>>> PCollection.java:114)
>>>>
>>>> at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(
>>>> TransformHierarchy.java:190)
>>>>
>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
>>>>
>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>>
>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>>>>
>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>>> Combine.java:1074)
>>>>
>>>> at org.apache.beam.sdk.transforms.Combine$Globally.expand(
>>>> Combine.java:943)
>>>>
>>>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>>>>
>>>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>>>>
>>>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>>>>
>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:27)
>>>>
>>>> *--*
>>>> *Mahesh Vangala*
>>>> *(Ph) 443-326-1957*
>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>
>>>

Reply via email to